aboutsummaryrefslogtreecommitdiffstats
path: root/packages
diff options
context:
space:
mode:
Diffstat (limited to 'packages')
-rw-r--r--packages/api/utils/assets.ts4
-rw-r--r--packages/api/utils/upload.ts2
-rw-r--r--packages/e2e_tests/docker-compose.yml15
-rw-r--r--packages/e2e_tests/package.json5
-rw-r--r--packages/e2e_tests/setup/startContainers.ts4
-rw-r--r--packages/e2e_tests/tests/assetdb/assetdb-utils.ts289
-rw-r--r--packages/e2e_tests/tests/assetdb/interface-compliance.test.ts627
-rw-r--r--packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts228
-rw-r--r--packages/e2e_tests/tests/assetdb/s3-store.test.ts197
-rw-r--r--packages/shared/assetdb.ts670
-rw-r--r--packages/shared/config.ts21
-rw-r--r--packages/shared/package.json1
12 files changed, 1964 insertions, 99 deletions
diff --git a/packages/api/utils/assets.ts b/packages/api/utils/assets.ts
index d8a726a6..205e1a76 100644
--- a/packages/api/utils/assets.ts
+++ b/packages/api/utils/assets.ts
@@ -28,7 +28,7 @@ export async function serveAsset(c: Context, assetId: string, userId: string) {
const start = parseInt(parts[0], 10);
const end = parts[1] ? parseInt(parts[1], 10) : size - 1;
- const fStream = createAssetReadStream({
+ const fStream = await createAssetReadStream({
userId,
assetId,
start,
@@ -43,7 +43,7 @@ export async function serveAsset(c: Context, assetId: string, userId: string) {
await stream.pipe(toWebReadableStream(fStream));
});
} else {
- const fStream = createAssetReadStream({
+ const fStream = await createAssetReadStream({
userId,
assetId,
});
diff --git a/packages/api/utils/upload.ts b/packages/api/utils/upload.ts
index d96a0f60..daff1fb9 100644
--- a/packages/api/utils/upload.ts
+++ b/packages/api/utils/upload.ts
@@ -24,7 +24,7 @@ export function webStreamToNode(
}
export function toWebReadableStream(
- nodeStream: fs.ReadStream,
+ nodeStream: NodeJS.ReadableStream,
): ReadableStream<Uint8Array> {
const reader = nodeStream as unknown as Readable;
diff --git a/packages/e2e_tests/docker-compose.yml b/packages/e2e_tests/docker-compose.yml
index e1fe46bb..64775f46 100644
--- a/packages/e2e_tests/docker-compose.yml
+++ b/packages/e2e_tests/docker-compose.yml
@@ -35,3 +35,18 @@ services:
restart: unless-stopped
volumes:
- ./setup/html:/usr/share/nginx/html
+ minio:
+ image: minio/minio:latest
+ restart: unless-stopped
+ ports:
+ - "9000:9000"
+ - "9001:9001"
+ environment:
+ MINIO_ROOT_USER: minioadmin
+ MINIO_ROOT_PASSWORD: minioadmin
+ command: server /data --console-address ":9001"
+ volumes:
+ - minio_data:/data
+
+volumes:
+ minio_data:
diff --git a/packages/e2e_tests/package.json b/packages/e2e_tests/package.json
index 3f110838..0cbb8fb3 100644
--- a/packages/e2e_tests/package.json
+++ b/packages/e2e_tests/package.json
@@ -11,10 +11,13 @@
"lint": "oxlint .",
"lint:fix": "oxlint . --fix",
"test": "vitest run",
- "test:watch": "vitest"
+ "test:watch": "vitest",
+ "test:no-build": "E2E_TEST_NO_BUILD=1 vitest run"
},
"dependencies": {
+ "@aws-sdk/client-s3": "^3.842.0",
"@karakeep/sdk": "workspace:*",
+ "@karakeep/shared": "workspace:^0.1.0",
"@karakeep/trpc": "workspace:^0.1.0",
"superjson": "^2.2.1"
},
diff --git a/packages/e2e_tests/setup/startContainers.ts b/packages/e2e_tests/setup/startContainers.ts
index 3086d1c8..87e812a2 100644
--- a/packages/e2e_tests/setup/startContainers.ts
+++ b/packages/e2e_tests/setup/startContainers.ts
@@ -33,8 +33,10 @@ export default async function ({ provide }: GlobalSetupContext) {
const __dirname = path.dirname(fileURLToPath(import.meta.url));
const port = await getRandomPort();
+ const buildArg = process.env.E2E_TEST_NO_BUILD ? "" : "--build";
+
console.log(`Starting docker compose on port ${port}...`);
- execSync(`docker compose up --build -d`, {
+ execSync(`docker compose up ${buildArg} -d`, {
cwd: __dirname,
stdio: "inherit",
env: {
diff --git a/packages/e2e_tests/tests/assetdb/assetdb-utils.ts b/packages/e2e_tests/tests/assetdb/assetdb-utils.ts
new file mode 100644
index 00000000..a8e29ab4
--- /dev/null
+++ b/packages/e2e_tests/tests/assetdb/assetdb-utils.ts
@@ -0,0 +1,289 @@
+import * as fs from "fs";
+import * as os from "os";
+import * as path from "path";
+import {
+ CreateBucketCommand,
+ DeleteBucketCommand,
+ DeleteObjectCommand,
+ ListObjectsV2Command,
+ S3Client,
+} from "@aws-sdk/client-s3";
+
+import {
+ ASSET_TYPES,
+ AssetMetadata,
+ AssetStore,
+ LocalFileSystemAssetStore,
+ S3AssetStore,
+} from "@karakeep/shared/assetdb";
+
+export interface TestAssetData {
+ userId: string;
+ assetId: string;
+ content: Buffer;
+ metadata: AssetMetadata;
+}
+
+export function createTestAssetData(
+ overrides: Partial<TestAssetData> = {},
+): TestAssetData {
+ const defaultData: TestAssetData = {
+ userId: `user_${Math.random().toString(36).substring(7)}`,
+ assetId: `asset_${Math.random().toString(36).substring(7)}`,
+ content: Buffer.from(`Test content ${Math.random()}`),
+ metadata: {
+ contentType: ASSET_TYPES.TEXT_HTML,
+ fileName: "test.html",
+ },
+ };
+
+ return { ...defaultData, ...overrides };
+}
+
+export function createTestImageData(): TestAssetData {
+ // Create a minimal PNG image (1x1 pixel)
+ const pngData = Buffer.from([
+ 0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d,
+ 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01,
+ 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, 0xde, 0x00, 0x00, 0x00,
+ 0x0c, 0x49, 0x44, 0x41, 0x54, 0x08, 0xd7, 0x63, 0xf8, 0x0f, 0x00, 0x00,
+ 0x01, 0x00, 0x01, 0x5c, 0xc2, 0x8a, 0x8e, 0x00, 0x00, 0x00, 0x00, 0x49,
+ 0x45, 0x4e, 0x44, 0xae, 0x42, 0x60, 0x82,
+ ]);
+
+ return createTestAssetData({
+ content: pngData,
+ metadata: {
+ contentType: ASSET_TYPES.IMAGE_PNG,
+ fileName: "test.png",
+ },
+ });
+}
+
+export function createTestPdfData(): TestAssetData {
+ // Create a minimal PDF
+ const pdfContent = `%PDF-1.4
+1 0 obj
+<<
+/Type /Catalog
+/Pages 2 0 R
+>>
+endobj
+
+2 0 obj
+<<
+/Type /Pages
+/Kids [3 0 R]
+/Count 1
+>>
+endobj
+
+3 0 obj
+<<
+/Type /Page
+/Parent 2 0 R
+/MediaBox [0 0 612 792]
+>>
+endobj
+
+xref
+0 4
+0000000000 65535 f
+0000000010 00000 n
+0000000053 00000 n
+0000000125 00000 n
+trailer
+<<
+/Size 4
+/Root 1 0 R
+>>
+startxref
+173
+%%EOF`;
+
+ return createTestAssetData({
+ content: Buffer.from(pdfContent),
+ metadata: {
+ contentType: ASSET_TYPES.APPLICATION_PDF,
+ fileName: "test.pdf",
+ },
+ });
+}
+
+export async function createTempDirectory(): Promise<string> {
+ const tempDir = await fs.promises.mkdtemp(
+ path.join(os.tmpdir(), "assetdb-test-"),
+ );
+ return tempDir;
+}
+
+export async function cleanupTempDirectory(tempDir: string): Promise<void> {
+ try {
+ await fs.promises.rm(tempDir, { recursive: true, force: true });
+ } catch (error) {
+ console.warn(`Failed to cleanup temp directory ${tempDir}:`, error);
+ }
+}
+
+export function createLocalFileSystemStore(
+ rootPath: string,
+): LocalFileSystemAssetStore {
+ return new LocalFileSystemAssetStore(rootPath);
+}
+
+export function createS3Store(bucketName: string): S3AssetStore {
+ const s3Client = new S3Client({
+ region: "us-east-1",
+ endpoint: "http://localhost:9000", // MinIO endpoint for testing
+ credentials: {
+ accessKeyId: "minioadmin",
+ secretAccessKey: "minioadmin",
+ },
+ forcePathStyle: true,
+ });
+
+ return new S3AssetStore(s3Client, bucketName);
+}
+
+export async function createTestBucket(bucketName: string): Promise<S3Client> {
+ const s3Client = new S3Client({
+ region: "us-east-1",
+ endpoint: "http://localhost:9000",
+ credentials: {
+ accessKeyId: "minioadmin",
+ secretAccessKey: "minioadmin",
+ },
+ forcePathStyle: true,
+ });
+
+ try {
+ await s3Client.send(new CreateBucketCommand({ Bucket: bucketName }));
+ } catch (error: unknown) {
+ const err = error as { name?: string };
+ if (
+ err.name !== "BucketAlreadyOwnedByYou" &&
+ err.name !== "BucketAlreadyExists"
+ ) {
+ throw error;
+ }
+ }
+
+ return s3Client;
+}
+
+export async function cleanupTestBucket(
+ s3Client: S3Client,
+ bucketName: string,
+): Promise<void> {
+ try {
+ // List and delete all objects
+ let continuationToken: string | undefined;
+ do {
+ const listResponse = await s3Client.send(
+ new ListObjectsV2Command({
+ Bucket: bucketName,
+ ContinuationToken: continuationToken,
+ }),
+ );
+
+ if (listResponse.Contents && listResponse.Contents.length > 0) {
+ const deletePromises = listResponse.Contents.map(
+ (obj: { Key?: string }) =>
+ s3Client.send(
+ new DeleteObjectCommand({
+ Bucket: bucketName,
+ Key: obj.Key!,
+ }),
+ ),
+ );
+ await Promise.all(deletePromises);
+ }
+
+ continuationToken = listResponse.NextContinuationToken;
+ } while (continuationToken);
+
+ // Delete the bucket
+ await s3Client.send(new DeleteBucketCommand({ Bucket: bucketName }));
+ } catch (error) {
+ console.warn(`Failed to cleanup S3 bucket ${bucketName}:`, error);
+ }
+}
+
+export async function createTempFile(
+ content: Buffer,
+ fileName: string,
+): Promise<string> {
+ const tempDir = await createTempDirectory();
+ const filePath = path.join(tempDir, fileName);
+ await fs.promises.writeFile(filePath, content);
+ return filePath;
+}
+
+export async function streamToBuffer(
+ stream: NodeJS.ReadableStream,
+): Promise<Buffer> {
+ const chunks: Buffer[] = [];
+ const readable = stream as AsyncIterable<Buffer>;
+
+ for await (const chunk of readable) {
+ chunks.push(chunk);
+ }
+
+ return Buffer.concat(chunks);
+}
+
+export function generateLargeBuffer(sizeInMB: number): Buffer {
+ const sizeInBytes = sizeInMB * 1024 * 1024;
+ const buffer = Buffer.alloc(sizeInBytes);
+
+ // Fill with some pattern to make it compressible but not empty
+ for (let i = 0; i < sizeInBytes; i++) {
+ buffer[i] = i % 256;
+ }
+
+ return buffer;
+}
+
+export async function assertAssetExists(
+ store: AssetStore,
+ userId: string,
+ assetId: string,
+): Promise<void> {
+ const { asset, metadata } = await store.readAsset({ userId, assetId });
+ if (!asset || !metadata) {
+ throw new Error(`Asset ${assetId} for user ${userId} does not exist`);
+ }
+}
+
+export async function assertAssetNotExists(
+ store: AssetStore,
+ userId: string,
+ assetId: string,
+): Promise<void> {
+ try {
+ await store.readAsset({ userId, assetId });
+ throw new Error(`Asset ${assetId} for user ${userId} should not exist`);
+ } catch (error: unknown) {
+ // Expected to throw
+ const err = error as { message?: string };
+ if (err.message?.includes("should not exist")) {
+ throw error;
+ }
+ }
+}
+
+export async function getAllAssetsArray(store: AssetStore): Promise<
+ {
+ userId: string;
+ assetId: string;
+ contentType: string;
+ fileName?: string | null;
+ size: number;
+ }[]
+> {
+ const assets = [];
+ for await (const asset of store.getAllAssets()) {
+ assets.push(asset);
+ }
+ return assets;
+}
diff --git a/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts b/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts
new file mode 100644
index 00000000..d5288c7a
--- /dev/null
+++ b/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts
@@ -0,0 +1,627 @@
+import * as fs from "fs";
+import { afterEach, beforeEach, describe, expect, it } from "vitest";
+
+import { ASSET_TYPES, AssetStore } from "@karakeep/shared/assetdb";
+
+import {
+ assertAssetExists,
+ assertAssetNotExists,
+ cleanupTempDirectory,
+ cleanupTestBucket,
+ createLocalFileSystemStore,
+ createS3Store,
+ createTempDirectory,
+ createTempFile,
+ createTestAssetData,
+ createTestBucket,
+ createTestImageData,
+ createTestPdfData,
+ generateLargeBuffer,
+ getAllAssetsArray,
+ streamToBuffer,
+} from "./assetdb-utils";
+
+interface TestContext {
+ store: AssetStore;
+ cleanup: () => Promise<void>;
+}
+
+async function createLocalContext(): Promise<TestContext> {
+ const tempDir = await createTempDirectory();
+ const store = createLocalFileSystemStore(tempDir);
+
+ return {
+ store,
+ cleanup: async () => {
+ await cleanupTempDirectory(tempDir);
+ },
+ };
+}
+
+async function createS3Context(): Promise<TestContext> {
+ const bucketName = `test-bucket-${Math.random().toString(36).substring(7)}`;
+ const s3Client = await createTestBucket(bucketName);
+ const store = createS3Store(bucketName);
+
+ return {
+ store,
+ cleanup: async () => {
+ await cleanupTestBucket(s3Client, bucketName);
+ },
+ };
+}
+
+describe.each([
+ { name: "LocalFileSystemAssetStore", createContext: createLocalContext },
+ { name: "S3AssetStore", createContext: createS3Context },
+])("AssetStore Interface Compliance - $name", ({ createContext }) => {
+ let context: TestContext;
+
+ beforeEach(async () => {
+ context = await createContext();
+ });
+
+ afterEach(async () => {
+ await context.cleanup();
+ });
+
+ describe("Basic CRUD Operations", () => {
+ it("should save and retrieve an asset", async () => {
+ const testData = createTestAssetData();
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const { asset, metadata } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(asset).toEqual(testData.content);
+ expect(metadata).toEqual(testData.metadata);
+ });
+
+ it("should delete an asset", async () => {
+ const testData = createTestAssetData();
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ await assertAssetExists(context.store, testData.userId, testData.assetId);
+
+ await context.store.deleteAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ await assertAssetNotExists(
+ context.store,
+ testData.userId,
+ testData.assetId,
+ );
+ });
+
+ it("should get asset size", async () => {
+ const testData = createTestAssetData();
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const size = await context.store.getAssetSize({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(size).toBe(testData.content.length);
+ });
+
+ it("should read asset metadata", async () => {
+ const testData = createTestAssetData();
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const metadata = await context.store.readAssetMetadata({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(metadata).toEqual(testData.metadata);
+ });
+ });
+
+ describe("Streaming Operations", () => {
+ it("should create readable stream", async () => {
+ const testData = createTestAssetData();
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const stream = await context.store.createAssetReadStream({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ const streamedContent = await streamToBuffer(stream);
+ expect(streamedContent).toEqual(testData.content);
+ });
+
+ it("should support range requests in streams", async () => {
+ const content = Buffer.from("0123456789abcdef");
+ const testData = createTestAssetData({ content });
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const stream = await context.store.createAssetReadStream({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ start: 5,
+ end: 10,
+ });
+
+ const streamedContent = await streamToBuffer(stream);
+ expect(streamedContent.toString()).toBe("56789a");
+ });
+ });
+
+ describe("Asset Types Support", () => {
+ it("should support all required asset types", async () => {
+ const testCases = [
+ { contentType: ASSET_TYPES.IMAGE_JPEG, fileName: "test.jpg" },
+ { contentType: ASSET_TYPES.IMAGE_PNG, fileName: "test.png" },
+ { contentType: ASSET_TYPES.IMAGE_WEBP, fileName: "test.webp" },
+ { contentType: ASSET_TYPES.APPLICATION_PDF, fileName: "test.pdf" },
+ { contentType: ASSET_TYPES.TEXT_HTML, fileName: "test.html" },
+ { contentType: ASSET_TYPES.VIDEO_MP4, fileName: "test.mp4" },
+ ];
+
+ for (const { contentType, fileName } of testCases) {
+ const testData = createTestAssetData({
+ metadata: { contentType, fileName },
+ });
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const { metadata } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(metadata.contentType).toBe(contentType);
+ expect(metadata.fileName).toBe(fileName);
+ }
+ });
+
+ it("should handle large assets", async () => {
+ const largeContent = generateLargeBuffer(5); // 5MB
+ const testData = createTestAssetData({ content: largeContent });
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const { asset } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(asset.length).toBe(largeContent.length);
+ expect(asset).toEqual(largeContent);
+ });
+
+ it("should reject unsupported asset types", async () => {
+ const testData = createTestAssetData({
+ metadata: {
+ contentType: "unsupported/type",
+ fileName: "test.unsupported",
+ },
+ });
+
+ await expect(
+ context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ }),
+ ).rejects.toThrow("Unsupported asset type");
+ });
+ });
+
+ describe("Bulk Operations", () => {
+ it("should delete all user assets", async () => {
+ const userId = "bulk-test-user";
+ const testAssets = [
+ createTestAssetData({ userId }),
+ createTestAssetData({ userId }),
+ createTestAssetData({ userId }),
+ ];
+ const otherUserAsset = createTestAssetData(); // Different user
+
+ // Save all assets
+ await Promise.all([
+ ...testAssets.map((testData) =>
+ context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ }),
+ ),
+ context.store.saveAsset({
+ userId: otherUserAsset.userId,
+ assetId: otherUserAsset.assetId,
+ asset: otherUserAsset.content,
+ metadata: otherUserAsset.metadata,
+ }),
+ ]);
+
+ // Delete user assets
+ await context.store.deleteUserAssets({ userId });
+
+ // Verify user assets are deleted
+ for (const testData of testAssets) {
+ await assertAssetNotExists(
+ context.store,
+ testData.userId,
+ testData.assetId,
+ );
+ }
+
+ // Verify other user's asset still exists
+ await assertAssetExists(
+ context.store,
+ otherUserAsset.userId,
+ otherUserAsset.assetId,
+ );
+ });
+
+ it("should list all assets", async () => {
+ const testAssets = [
+ createTestAssetData(),
+ createTestImageData(),
+ createTestPdfData(),
+ ];
+
+ // Save all assets
+ await Promise.all(
+ testAssets.map((testData) =>
+ context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ }),
+ ),
+ );
+
+ const assets = await getAllAssetsArray(context.store);
+
+ expect(assets).toHaveLength(3);
+
+ // Verify all assets are present
+ const assetIds = assets.map((a) => a.assetId);
+ for (const testData of testAssets) {
+ expect(assetIds).toContain(testData.assetId);
+ }
+
+ // Verify asset structure
+ for (const asset of assets) {
+ expect(asset).toHaveProperty("userId");
+ expect(asset).toHaveProperty("assetId");
+ expect(asset).toHaveProperty("contentType");
+ expect(asset).toHaveProperty("size");
+ expect(typeof asset.size).toBe("number");
+ expect(asset.size).toBeGreaterThan(0);
+ }
+ });
+ });
+
+ describe("File Operations", () => {
+ it("should save asset from file and delete original file", async () => {
+ const testData = createTestAssetData();
+ const tempFile = await createTempFile(testData.content, "temp-asset.bin");
+
+ // Verify temp file exists before operation
+ expect(
+ await fs.promises
+ .access(tempFile)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ await context.store.saveAssetFromFile({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ assetPath: tempFile,
+ metadata: testData.metadata,
+ });
+
+ // Verify temp file was deleted
+ expect(
+ await fs.promises
+ .access(tempFile)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(false);
+
+ // Verify asset was saved correctly
+ const { asset, metadata } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(asset).toEqual(testData.content);
+ expect(metadata).toEqual(testData.metadata);
+ });
+ });
+
+ describe("Error Handling", () => {
+ it("should throw error for non-existent asset read", async () => {
+ await expect(
+ context.store.readAsset({
+ userId: "non-existent-user",
+ assetId: "non-existent-asset",
+ }),
+ ).rejects.toThrow();
+ });
+
+ it("should throw error for non-existent asset metadata", async () => {
+ await expect(
+ context.store.readAssetMetadata({
+ userId: "non-existent-user",
+ assetId: "non-existent-asset",
+ }),
+ ).rejects.toThrow();
+ });
+
+ it("should throw error for non-existent asset size", async () => {
+ await expect(
+ context.store.getAssetSize({
+ userId: "non-existent-user",
+ assetId: "non-existent-asset",
+ }),
+ ).rejects.toThrow();
+ });
+
+ it("should handle deleting non-existent asset gracefully", async () => {
+ // Filesystem implementation throws errors for non-existent files
+ await expect(
+ context.store.deleteAsset({
+ userId: "non-existent-user",
+ assetId: "non-existent-asset",
+ }),
+ ).resolves.not.toThrow();
+ });
+
+ it("should handle deletion of non-existent user directory gracefully", async () => {
+ // Should not throw error when user directory doesn't exist
+ await expect(
+ context.store.deleteUserAssets({ userId: "non-existent-user" }),
+ ).resolves.not.toThrow();
+ });
+
+ it("should handle non-existent asset stream appropriately", async () => {
+ const streamResult = context.store.createAssetReadStream({
+ userId: "non-existent-user",
+ assetId: "non-existent-asset",
+ });
+
+ await expect(streamResult).rejects.toThrow();
+ });
+ });
+
+ describe("Data Integrity", () => {
+ it("should preserve binary data integrity", async () => {
+ const testData = createTestImageData();
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const { asset } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ // Verify exact binary match
+ expect(asset).toEqual(testData.content);
+
+ // Verify PNG header is intact
+ expect(asset[0]).toBe(0x89);
+ expect(asset[1]).toBe(0x50);
+ expect(asset[2]).toBe(0x4e);
+ expect(asset[3]).toBe(0x47);
+ });
+
+ it("should preserve metadata exactly", async () => {
+ const testData = createTestAssetData({
+ metadata: {
+ contentType: ASSET_TYPES.APPLICATION_PDF,
+ fileName: "test-document.pdf",
+ },
+ });
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const { metadata } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(metadata).toEqual(testData.metadata);
+ expect(metadata.contentType).toBe(ASSET_TYPES.APPLICATION_PDF);
+ expect(metadata.fileName).toBe("test-document.pdf");
+ });
+
+ it("should handle null fileName correctly", async () => {
+ const testData = createTestAssetData({
+ metadata: {
+ contentType: ASSET_TYPES.TEXT_HTML,
+ fileName: null,
+ },
+ });
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const { metadata } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(metadata.fileName).toBeNull();
+ });
+ });
+
+ describe("Concurrent Operations", () => {
+ it("should handle concurrent saves safely", async () => {
+ const testAssets = Array.from({ length: 5 }, () => createTestAssetData());
+
+ await Promise.all(
+ testAssets.map((testData) =>
+ context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ }),
+ ),
+ );
+
+ // Verify all assets were saved correctly
+ for (const testData of testAssets) {
+ const { asset, metadata } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(asset).toEqual(testData.content);
+ expect(metadata).toEqual(testData.metadata);
+ }
+ });
+
+ it("should handle concurrent reads safely", async () => {
+ const testData = createTestAssetData();
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ // Perform multiple concurrent reads
+ const readPromises = Array.from({ length: 10 }, () =>
+ context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ }),
+ );
+
+ const results = await Promise.all(readPromises);
+
+ // Verify all reads returned the same data
+ for (const { asset, metadata } of results) {
+ expect(asset).toEqual(testData.content);
+ expect(metadata).toEqual(testData.metadata);
+ }
+ });
+ });
+
+ describe("Edge Cases", () => {
+ it("should handle empty assets", async () => {
+ const testData = createTestAssetData({
+ content: Buffer.alloc(0),
+ });
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const { asset } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(asset.length).toBe(0);
+
+ const size = await context.store.getAssetSize({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(size).toBe(0);
+ });
+
+ it("should handle special characters in user and asset IDs", async () => {
+ const testData = createTestAssetData({
+ userId: "user-with-special_chars.123",
+ assetId: "asset_with-special.chars_456",
+ });
+
+ await context.store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const { asset, metadata } = await context.store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(asset).toEqual(testData.content);
+ expect(metadata).toEqual(testData.metadata);
+ });
+ });
+});
diff --git a/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts b/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts
new file mode 100644
index 00000000..36ff837f
--- /dev/null
+++ b/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts
@@ -0,0 +1,228 @@
+import * as fs from "fs";
+import * as path from "path";
+import { afterEach, beforeEach, describe, expect, it } from "vitest";
+
+import { LocalFileSystemAssetStore } from "@karakeep/shared/assetdb";
+
+import {
+ assertAssetNotExists,
+ cleanupTempDirectory,
+ createLocalFileSystemStore,
+ createTempDirectory,
+ createTestAssetData,
+} from "./assetdb-utils";
+
+describe("LocalFileSystemAssetStore - Filesystem-Specific Behaviors", () => {
+ let tempDir: string;
+ let store: LocalFileSystemAssetStore;
+
+ beforeEach(async () => {
+ tempDir = await createTempDirectory();
+ store = createLocalFileSystemStore(tempDir);
+ });
+
+ afterEach(async () => {
+ await cleanupTempDirectory(tempDir);
+ });
+
+ describe("File System Structure", () => {
+ it("should create correct directory structure and files", async () => {
+ const testData = createTestAssetData();
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ // Verify directory structure
+ const assetDir = path.join(tempDir, testData.userId, testData.assetId);
+ expect(
+ await fs.promises
+ .access(assetDir)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ // Verify asset.bin file
+ const assetFile = path.join(assetDir, "asset.bin");
+ expect(
+ await fs.promises
+ .access(assetFile)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ // Verify metadata.json file
+ const metadataFile = path.join(assetDir, "metadata.json");
+ expect(
+ await fs.promises
+ .access(metadataFile)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ // Verify file contents
+ const savedContent = await fs.promises.readFile(assetFile);
+ expect(savedContent).toEqual(testData.content);
+
+ const savedMetadata = JSON.parse(
+ await fs.promises.readFile(metadataFile, "utf8"),
+ );
+ expect(savedMetadata).toEqual(testData.metadata);
+ });
+
+ it("should create nested directory structure for user/asset hierarchy", async () => {
+ const userId = "user123";
+ const assetId = "asset456";
+ const testData = createTestAssetData({ userId, assetId });
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ // Verify the exact directory structure
+ const userDir = path.join(tempDir, userId);
+ const assetDir = path.join(userDir, assetId);
+
+ expect(
+ await fs.promises
+ .access(userDir)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ expect(
+ await fs.promises
+ .access(assetDir)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ // Verify files exist in the correct location
+ expect(
+ await fs.promises
+ .access(path.join(assetDir, "asset.bin"))
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ expect(
+ await fs.promises
+ .access(path.join(assetDir, "metadata.json"))
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+ });
+ });
+
+ describe("Directory Cleanup", () => {
+ it("should remove entire asset directory when deleting asset", async () => {
+ const testData = createTestAssetData();
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const assetDir = path.join(tempDir, testData.userId, testData.assetId);
+
+ // Verify directory exists
+ expect(
+ await fs.promises
+ .access(assetDir)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ await store.deleteAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ // Verify entire directory was removed
+ expect(
+ await fs.promises
+ .access(assetDir)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(false);
+
+ await assertAssetNotExists(store, testData.userId, testData.assetId);
+ });
+
+ it("should remove entire user directory when deleting all user assets", async () => {
+ const userId = "test-user";
+ const testData1 = createTestAssetData({ userId });
+ const testData2 = createTestAssetData({ userId });
+
+ await Promise.all([
+ store.saveAsset({
+ userId: testData1.userId,
+ assetId: testData1.assetId,
+ asset: testData1.content,
+ metadata: testData1.metadata,
+ }),
+ store.saveAsset({
+ userId: testData2.userId,
+ assetId: testData2.assetId,
+ asset: testData2.content,
+ metadata: testData2.metadata,
+ }),
+ ]);
+
+ const userDir = path.join(tempDir, userId);
+
+ // Verify user directory exists
+ expect(
+ await fs.promises
+ .access(userDir)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(true);
+
+ await store.deleteUserAssets({ userId });
+
+ // Verify entire user directory was removed
+ expect(
+ await fs.promises
+ .access(userDir)
+ .then(() => true)
+ .catch(() => false),
+ ).toBe(false);
+ });
+ });
+
+ describe("File System Permissions", () => {
+ it("should create directories with appropriate permissions", async () => {
+ const testData = createTestAssetData();
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const userDir = path.join(tempDir, testData.userId);
+ const assetDir = path.join(userDir, testData.assetId);
+
+ // Verify directories are readable and writable
+ const userStats = await fs.promises.stat(userDir);
+ const assetStats = await fs.promises.stat(assetDir);
+
+ expect(userStats.isDirectory()).toBe(true);
+ expect(assetStats.isDirectory()).toBe(true);
+
+ // Verify we can read and write to the directories
+ await fs.promises.access(userDir, fs.constants.R_OK | fs.constants.W_OK);
+ await fs.promises.access(assetDir, fs.constants.R_OK | fs.constants.W_OK);
+ });
+ });
+});
diff --git a/packages/e2e_tests/tests/assetdb/s3-store.test.ts b/packages/e2e_tests/tests/assetdb/s3-store.test.ts
new file mode 100644
index 00000000..c573750e
--- /dev/null
+++ b/packages/e2e_tests/tests/assetdb/s3-store.test.ts
@@ -0,0 +1,197 @@
+import { HeadObjectCommand, S3Client } from "@aws-sdk/client-s3";
+import { afterEach, beforeEach, describe, expect, it } from "vitest";
+
+import { S3AssetStore } from "@karakeep/shared/assetdb";
+
+import {
+ assertAssetExists,
+ cleanupTestBucket,
+ createS3Store,
+ createTestAssetData,
+ createTestBucket,
+} from "./assetdb-utils";
+
+describe("S3AssetStore - S3-Specific Behaviors", () => {
+ let bucketName: string;
+ let s3Client: S3Client;
+ let store: S3AssetStore;
+
+ beforeEach(async () => {
+ bucketName = `test-bucket-${Math.random().toString(36).substring(7)}`;
+ s3Client = await createTestBucket(bucketName);
+ store = createS3Store(bucketName);
+ });
+
+ afterEach(async () => {
+ await cleanupTestBucket(s3Client, bucketName);
+ });
+
+ describe("S3 Object Structure", () => {
+ it("should store asset as single object with user-defined metadata", async () => {
+ const testData = createTestAssetData();
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ // Verify the object exists with the expected key structure
+ const objectKey = `${testData.userId}/${testData.assetId}`;
+ const headResponse = await s3Client.send(
+ new HeadObjectCommand({
+ Bucket: bucketName,
+ Key: objectKey,
+ }),
+ );
+
+ // Verify metadata is stored in S3 user-defined metadata
+ expect(headResponse.Metadata).toBeDefined();
+ expect(headResponse.Metadata!["x-amz-meta-content-type"]).toBe(
+ testData.metadata.contentType,
+ );
+ expect(headResponse.Metadata!["x-amz-meta-file-name"]).toBe(
+ testData.metadata.fileName || "",
+ );
+
+ // Verify content type is set correctly on the S3 object
+ expect(headResponse.ContentType).toBe(testData.metadata.contentType);
+ });
+
+ it("should handle null fileName in metadata correctly", async () => {
+ const testData = createTestAssetData({
+ metadata: {
+ contentType: "text/html",
+ fileName: null,
+ },
+ });
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ const objectKey = `${testData.userId}/${testData.assetId}`;
+ const headResponse = await s3Client.send(
+ new HeadObjectCommand({
+ Bucket: bucketName,
+ Key: objectKey,
+ }),
+ );
+
+ // Verify null fileName are not stored in S3 metadata
+ expect(headResponse.Metadata!["x-amz-meta-file-name"]).toBeUndefined();
+
+ // Verify reading back gives us null fileName
+ const metadata = await store.readAssetMetadata({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+ expect(metadata.fileName).toBeNull();
+ });
+ });
+
+ describe("S3 Key Structure", () => {
+ it("should use clean userId/assetId key structure", async () => {
+ const userId = "user123";
+ const assetId = "asset456";
+ const testData = createTestAssetData({ userId, assetId });
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ // Verify the exact key structure
+ const expectedKey = `${userId}/${assetId}`;
+ const headResponse = await s3Client.send(
+ new HeadObjectCommand({
+ Bucket: bucketName,
+ Key: expectedKey,
+ }),
+ );
+
+ expect(headResponse.ContentLength).toBe(testData.content.length);
+ });
+
+ it("should handle special characters in user and asset IDs", async () => {
+ const userId = "user/with/slashes";
+ const assetId = "asset-with-special-chars_123";
+ const testData = createTestAssetData({ userId, assetId });
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ await assertAssetExists(store, testData.userId, testData.assetId);
+ });
+ });
+
+ describe("S3 Eventual Consistency", () => {
+ it("should handle immediate read after write (MinIO strong consistency)", async () => {
+ const testData = createTestAssetData();
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ // Immediately try to read - should work with MinIO's strong consistency
+ const { asset, metadata } = await store.readAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(asset).toEqual(testData.content);
+ expect(metadata).toEqual(testData.metadata);
+ });
+ });
+
+ describe("S3 Metadata Conversion", () => {
+ it("should correctly convert between AssetMetadata and S3 metadata", async () => {
+ const testCases = [
+ {
+ contentType: "image/jpeg",
+ fileName: "test-image.jpg",
+ },
+ {
+ contentType: "application/pdf",
+ fileName: "document.pdf",
+ },
+ {
+ contentType: "text/html",
+ fileName: null,
+ },
+ ];
+
+ for (const metadata of testCases) {
+ const testData = createTestAssetData({ metadata });
+
+ await store.saveAsset({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ asset: testData.content,
+ metadata: testData.metadata,
+ });
+
+ // Verify metadata round-trip
+ const retrievedMetadata = await store.readAssetMetadata({
+ userId: testData.userId,
+ assetId: testData.assetId,
+ });
+
+ expect(retrievedMetadata).toEqual(testData.metadata);
+ }
+ });
+ });
+});
diff --git a/packages/shared/assetdb.ts b/packages/shared/assetdb.ts
index 3ac92067..77050406 100644
--- a/packages/shared/assetdb.ts
+++ b/packages/shared/assetdb.ts
@@ -1,5 +1,16 @@
import * as fs from "fs";
import * as path from "path";
+import { Readable } from "stream";
+import {
+ _Object,
+ DeleteObjectCommand,
+ DeleteObjectsCommand,
+ GetObjectCommand,
+ HeadObjectCommand,
+ ListObjectsV2Command,
+ PutObjectCommand,
+ S3Client,
+} from "@aws-sdk/client-s3";
import { Glob } from "glob";
import { z } from "zod";
@@ -43,19 +54,563 @@ export const SUPPORTED_ASSET_TYPES: Set<string> = new Set<string>([
ASSET_TYPES.VIDEO_MP4,
]);
-function getAssetDir(userId: string, assetId: string) {
- return path.join(ROOT_PATH, userId, assetId);
-}
-
export const zAssetMetadataSchema = z.object({
contentType: z.string(),
fileName: z.string().nullish(),
});
+export type AssetMetadata = z.infer<typeof zAssetMetadataSchema>;
+
+export interface AssetInfo {
+ userId: string;
+ assetId: string;
+ contentType: string;
+ fileName?: string | null;
+ size: number;
+}
+
+export interface AssetStore {
+ saveAsset(params: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }): Promise<void>;
+
+ saveAssetFromFile(params: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }): Promise<void>;
+
+ readAsset(params: {
+ userId: string;
+ assetId: string;
+ }): Promise<{ asset: Buffer; metadata: AssetMetadata }>;
+
+ createAssetReadStream(params: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }): Promise<NodeJS.ReadableStream>;
+
+ readAssetMetadata(params: {
+ userId: string;
+ assetId: string;
+ }): Promise<AssetMetadata>;
+
+ getAssetSize(params: { userId: string; assetId: string }): Promise<number>;
+
+ deleteAsset(params: { userId: string; assetId: string }): Promise<void>;
+
+ deleteUserAssets(params: { userId: string }): Promise<void>;
+
+ getAllAssets(): AsyncGenerator<AssetInfo>;
+}
+
export function newAssetId() {
return crypto.randomUUID();
}
+class LocalFileSystemAssetStore implements AssetStore {
+ private rootPath: string;
+
+ constructor(rootPath: string) {
+ this.rootPath = rootPath;
+ }
+
+ private getAssetDir(userId: string, assetId: string) {
+ return path.join(this.rootPath, userId, assetId);
+ }
+
+ private async isPathExists(filePath: string) {
+ return fs.promises
+ .access(filePath)
+ .then(() => true)
+ .catch(() => false);
+ }
+
+ async saveAsset({
+ userId,
+ assetId,
+ asset,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+ const assetDir = this.getAssetDir(userId, assetId);
+ await fs.promises.mkdir(assetDir, { recursive: true });
+
+ await Promise.all([
+ fs.promises.writeFile(
+ path.join(assetDir, "asset.bin"),
+ Uint8Array.from(asset),
+ ),
+ fs.promises.writeFile(
+ path.join(assetDir, "metadata.json"),
+ JSON.stringify(metadata),
+ ),
+ ]);
+ }
+
+ async saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+ const assetDir = this.getAssetDir(userId, assetId);
+ await fs.promises.mkdir(assetDir, { recursive: true });
+
+ await Promise.all([
+ fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")),
+ fs.promises.writeFile(
+ path.join(assetDir, "metadata.json"),
+ JSON.stringify(metadata),
+ ),
+ ]);
+ await fs.promises.rm(assetPath);
+ }
+
+ async readAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+
+ const [asset, metadataStr] = await Promise.all([
+ fs.promises.readFile(path.join(assetDir, "asset.bin")),
+ fs.promises.readFile(path.join(assetDir, "metadata.json"), {
+ encoding: "utf8",
+ }),
+ ]);
+
+ const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr));
+ return { asset, metadata };
+ }
+
+ async createAssetReadStream({
+ userId,
+ assetId,
+ start,
+ end,
+ }: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ const assetPath = path.join(assetDir, "asset.bin");
+ if (!(await this.isPathExists(assetPath))) {
+ throw new Error(`Asset ${assetId} not found`);
+ }
+
+ return fs.createReadStream(path.join(assetDir, "asset.bin"), {
+ start,
+ end,
+ });
+ }
+
+ async readAssetMetadata({
+ userId,
+ assetId,
+ }: {
+ userId: string;
+ assetId: string;
+ }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+
+ const metadataStr = await fs.promises.readFile(
+ path.join(assetDir, "metadata.json"),
+ {
+ encoding: "utf8",
+ },
+ );
+
+ return zAssetMetadataSchema.parse(JSON.parse(metadataStr));
+ }
+
+ async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ const stat = await fs.promises.stat(path.join(assetDir, "asset.bin"));
+ return stat.size;
+ }
+
+ async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const assetDir = this.getAssetDir(userId, assetId);
+ if (!(await this.isPathExists(assetDir))) {
+ return;
+ }
+ await fs.promises.rm(assetDir, { recursive: true });
+ }
+
+ async deleteUserAssets({ userId }: { userId: string }) {
+ const userDir = path.join(this.rootPath, userId);
+ const dirExists = await this.isPathExists(userDir);
+ if (!dirExists) {
+ return;
+ }
+ await fs.promises.rm(userDir, { recursive: true });
+ }
+
+ async *getAllAssets() {
+ const g = new Glob(`/**/**/asset.bin`, {
+ maxDepth: 3,
+ root: this.rootPath,
+ cwd: this.rootPath,
+ absolute: false,
+ });
+ for await (const file of g) {
+ const [userId, assetId] = file.split("/").slice(0, 2);
+ const [size, metadata] = await Promise.all([
+ this.getAssetSize({ userId, assetId }),
+ this.readAssetMetadata({ userId, assetId }),
+ ]);
+ yield {
+ userId,
+ assetId,
+ ...metadata,
+ size,
+ };
+ }
+ }
+}
+
+class S3AssetStore implements AssetStore {
+ private s3Client: S3Client;
+ private bucketName: string;
+
+ constructor(s3Client: S3Client, bucketName: string) {
+ this.s3Client = s3Client;
+ this.bucketName = bucketName;
+ }
+
+ private getAssetKey(userId: string, assetId: string) {
+ return `${userId}/${assetId}`;
+ }
+
+ private metadataToS3Metadata(
+ metadata: AssetMetadata,
+ ): Record<string, string> {
+ return {
+ ...(metadata.fileName
+ ? { "x-amz-meta-file-name": metadata.fileName }
+ : {}),
+ "x-amz-meta-content-type": metadata.contentType,
+ };
+ }
+
+ private s3MetadataToMetadata(
+ s3Metadata: Record<string, string> | undefined,
+ ): AssetMetadata {
+ if (!s3Metadata) {
+ throw new Error("No metadata found in S3 object");
+ }
+
+ return {
+ contentType: s3Metadata["x-amz-meta-content-type"] || "",
+ fileName: s3Metadata["x-amz-meta-file-name"] ?? null,
+ };
+ }
+
+ async saveAsset({
+ userId,
+ assetId,
+ asset,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ asset: Buffer;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+
+ await this.s3Client.send(
+ new PutObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Body: asset,
+ ContentType: metadata.contentType,
+ Metadata: this.metadataToS3Metadata(metadata),
+ }),
+ );
+ }
+
+ async saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ }: {
+ userId: string;
+ assetId: string;
+ assetPath: string;
+ metadata: AssetMetadata;
+ }) {
+ if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
+ throw new Error("Unsupported asset type");
+ }
+
+ const asset = fs.createReadStream(assetPath);
+ await this.s3Client.send(
+ new PutObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Body: asset,
+ ContentType: metadata.contentType,
+ Metadata: this.metadataToS3Metadata(metadata),
+ }),
+ );
+ await fs.promises.rm(assetPath);
+ }
+
+ async readAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ const response = await this.s3Client.send(
+ new GetObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ if (!response.Body) {
+ throw new Error("Asset not found");
+ }
+
+ const assetBuffer = await this.streamToBuffer(response.Body as Readable);
+ const metadata = this.s3MetadataToMetadata(response.Metadata);
+
+ return { asset: assetBuffer, metadata };
+ }
+
+ async createAssetReadStream({
+ userId,
+ assetId,
+ start,
+ end,
+ }: {
+ userId: string;
+ assetId: string;
+ start?: number;
+ end?: number;
+ }) {
+ const range =
+ start !== undefined && end !== undefined
+ ? `bytes=${start}-${end}`
+ : undefined;
+
+ const command = new GetObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ Range: range,
+ });
+
+ const response = await this.s3Client.send(command);
+ if (!response.Body) {
+ throw new Error("Asset not found");
+ }
+ return response.Body as NodeJS.ReadableStream;
+ }
+
+ async readAssetMetadata({
+ userId,
+ assetId,
+ }: {
+ userId: string;
+ assetId: string;
+ }) {
+ const response = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ return this.s3MetadataToMetadata(response.Metadata);
+ }
+
+ async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) {
+ const response = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+
+ return response.ContentLength || 0;
+ }
+
+ async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) {
+ await this.s3Client.send(
+ new DeleteObjectCommand({
+ Bucket: this.bucketName,
+ Key: this.getAssetKey(userId, assetId),
+ }),
+ );
+ }
+
+ async deleteUserAssets({ userId }: { userId: string }) {
+ let continuationToken: string | undefined;
+
+ do {
+ const listResponse = await this.s3Client.send(
+ new ListObjectsV2Command({
+ Bucket: this.bucketName,
+ Prefix: `${userId}/`,
+ ContinuationToken: continuationToken,
+ }),
+ );
+
+ if (listResponse.Contents && listResponse.Contents.length > 0) {
+ await this.s3Client.send(
+ new DeleteObjectsCommand({
+ Bucket: this.bucketName,
+ Delete: {
+ Objects: listResponse.Contents.map((obj) => ({
+ Key: obj.Key!,
+ })),
+ },
+ }),
+ );
+ }
+
+ continuationToken = listResponse.NextContinuationToken;
+ } while (continuationToken);
+ }
+
+ async *getAllAssets() {
+ let continuationToken: string | undefined;
+
+ do {
+ const listResponse = await this.s3Client.send(
+ new ListObjectsV2Command({
+ Bucket: this.bucketName,
+ ContinuationToken: continuationToken,
+ }),
+ );
+
+ if (listResponse.Contents) {
+ for (const obj of listResponse.Contents) {
+ if (!obj.Key) continue;
+
+ const pathParts = obj.Key.split("/");
+ if (pathParts.length === 2) {
+ const userId = pathParts[0];
+ const assetId = pathParts[1];
+
+ try {
+ const headResponse = await this.s3Client.send(
+ new HeadObjectCommand({
+ Bucket: this.bucketName,
+ Key: obj.Key,
+ }),
+ );
+
+ const metadata = this.s3MetadataToMetadata(headResponse.Metadata);
+ const size = headResponse.ContentLength || 0;
+
+ yield {
+ userId,
+ assetId,
+ ...metadata,
+ size,
+ };
+ } catch (error) {
+ logger.warn(`Failed to read asset ${userId}/${assetId}:`, error);
+ }
+ }
+ }
+ }
+
+ continuationToken = listResponse.NextContinuationToken;
+ } while (continuationToken);
+ }
+
+ private async streamToBuffer(stream: Readable): Promise<Buffer> {
+ const chunks: Buffer[] = [];
+ for await (const chunk of stream) {
+ chunks.push(chunk);
+ }
+ return Buffer.concat(chunks);
+ }
+}
+
+function createDefaultAssetStore(): AssetStore {
+ const config = serverConfig.assetStore;
+
+ if (config.type === "s3") {
+ if (!config.s3.bucket) {
+ throw new Error(
+ "ASSET_STORE_S3_BUCKET is required when using S3 asset store",
+ );
+ }
+ if (!config.s3.accessKeyId || !config.s3.secretAccessKey) {
+ throw new Error(
+ "ASSET_STORE_S3_ACCESS_KEY_ID and ASSET_STORE_S3_SECRET_ACCESS_KEY are required when using S3 asset store",
+ );
+ }
+
+ const s3Client = new S3Client({
+ region: config.s3.region,
+ endpoint: config.s3.endpoint,
+ forcePathStyle: config.s3.forcePathStyle,
+ credentials: {
+ accessKeyId: config.s3.accessKeyId,
+ secretAccessKey: config.s3.secretAccessKey,
+ },
+ });
+
+ return new S3AssetStore(s3Client, config.s3.bucket);
+ }
+
+ return new LocalFileSystemAssetStore(ROOT_PATH);
+}
+
+const defaultAssetStore = createDefaultAssetStore();
+
+export { LocalFileSystemAssetStore, S3AssetStore };
+
+/**
+ * Example usage of S3AssetStore:
+ *
+ * import { S3Client } from "@aws-sdk/client-s3";
+ * import { S3AssetStore } from "@karakeep/shared/assetdb";
+ *
+ * const s3Client = new S3Client({
+ * region: "us-east-1",
+ * credentials: {
+ * accessKeyId: "your-access-key",
+ * secretAccessKey: "your-secret-key"
+ * }
+ * });
+ *
+ * const s3AssetStore = new S3AssetStore(s3Client, "your-bucket-name");
+ *
+ * // Use s3AssetStore instead of the default file system store
+ * await s3AssetStore.saveAsset({
+ * userId: "user123",
+ * assetId: "asset456",
+ * asset: buffer,
+ * metadata: { contentType: "image/jpeg", fileName: "photo.jpg" }
+ * });
+ */
+
export async function saveAsset({
userId,
assetId,
@@ -67,22 +622,7 @@ export async function saveAsset({
asset: Buffer;
metadata: z.infer<typeof zAssetMetadataSchema>;
}) {
- if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
- throw new Error("Unsupported asset type");
- }
- const assetDir = getAssetDir(userId, assetId);
- await fs.promises.mkdir(assetDir, { recursive: true });
-
- await Promise.all([
- fs.promises.writeFile(
- path.join(assetDir, "asset.bin"),
- Uint8Array.from(asset),
- ),
- fs.promises.writeFile(
- path.join(assetDir, "metadata.json"),
- JSON.stringify(metadata),
- ),
- ]);
+ return defaultAssetStore.saveAsset({ userId, assetId, asset, metadata });
}
export async function saveAssetFromFile({
@@ -96,22 +636,12 @@ export async function saveAssetFromFile({
assetPath: string;
metadata: z.infer<typeof zAssetMetadataSchema>;
}) {
- if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) {
- throw new Error("Unsupported asset type");
- }
- const assetDir = getAssetDir(userId, assetId);
- await fs.promises.mkdir(assetDir, { recursive: true });
-
- await Promise.all([
- // We'll have to copy first then delete the original file as inside the docker container
- // we can't move file between mounts.
- fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")),
- fs.promises.writeFile(
- path.join(assetDir, "metadata.json"),
- JSON.stringify(metadata),
- ),
- ]);
- await fs.promises.rm(assetPath);
+ return defaultAssetStore.saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata,
+ });
}
export async function readAsset({
@@ -121,20 +651,10 @@ export async function readAsset({
userId: string;
assetId: string;
}) {
- const assetDir = getAssetDir(userId, assetId);
-
- const [asset, metadataStr] = await Promise.all([
- fs.promises.readFile(path.join(assetDir, "asset.bin")),
- fs.promises.readFile(path.join(assetDir, "metadata.json"), {
- encoding: "utf8",
- }),
- ]);
-
- const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr));
- return { asset, metadata };
+ return defaultAssetStore.readAsset({ userId, assetId });
}
-export function createAssetReadStream({
+export async function createAssetReadStream({
userId,
assetId,
start,
@@ -145,9 +665,9 @@ export function createAssetReadStream({
start?: number;
end?: number;
}) {
- const assetDir = getAssetDir(userId, assetId);
-
- return fs.createReadStream(path.join(assetDir, "asset.bin"), {
+ return defaultAssetStore.createAssetReadStream({
+ userId,
+ assetId,
start,
end,
});
@@ -160,16 +680,7 @@ export async function readAssetMetadata({
userId: string;
assetId: string;
}) {
- const assetDir = getAssetDir(userId, assetId);
-
- const metadataStr = await fs.promises.readFile(
- path.join(assetDir, "metadata.json"),
- {
- encoding: "utf8",
- },
- );
-
- return zAssetMetadataSchema.parse(JSON.parse(metadataStr));
+ return defaultAssetStore.readAssetMetadata({ userId, assetId });
}
export async function getAssetSize({
@@ -179,9 +690,7 @@ export async function getAssetSize({
userId: string;
assetId: string;
}) {
- const assetDir = getAssetDir(userId, assetId);
- const stat = await fs.promises.stat(path.join(assetDir, "asset.bin"));
- return stat.size;
+ return defaultAssetStore.getAssetSize({ userId, assetId });
}
/**
@@ -205,42 +714,15 @@ export async function deleteAsset({
userId: string;
assetId: string;
}) {
- const assetDir = getAssetDir(userId, assetId);
- await fs.promises.rm(path.join(assetDir), { recursive: true });
+ return defaultAssetStore.deleteAsset({ userId, assetId });
}
export async function deleteUserAssets({ userId }: { userId: string }) {
- const userDir = path.join(ROOT_PATH, userId);
- const dirExists = await fs.promises
- .access(userDir)
- .then(() => true)
- .catch(() => false);
- if (!dirExists) {
- return;
- }
- await fs.promises.rm(userDir, { recursive: true });
+ return defaultAssetStore.deleteUserAssets({ userId });
}
export async function* getAllAssets() {
- const g = new Glob(`/**/**/asset.bin`, {
- maxDepth: 3,
- root: ROOT_PATH,
- cwd: ROOT_PATH,
- absolute: false,
- });
- for await (const file of g) {
- const [userId, assetId] = file.split("/").slice(0, 2);
- const [size, metadata] = await Promise.all([
- getAssetSize({ userId, assetId }),
- readAssetMetadata({ userId, assetId }),
- ]);
- yield {
- userId,
- assetId,
- ...metadata,
- size,
- };
- }
+ yield* defaultAssetStore.getAllAssets();
}
export async function storeScreenshot(
diff --git a/packages/shared/config.ts b/packages/shared/config.ts
index b899dbeb..715c2848 100644
--- a/packages/shared/config.ts
+++ b/packages/shared/config.ts
@@ -88,6 +88,14 @@ const allEnv = z.object({
// A flag to detect if the user is running in the old separete containers setup
USING_LEGACY_SEPARATE_CONTAINERS: stringBool("false"),
+
+ // Asset storage configuration
+ ASSET_STORE_S3_ENDPOINT: z.string().optional(),
+ ASSET_STORE_S3_REGION: z.string().optional(),
+ ASSET_STORE_S3_BUCKET: z.string().optional(),
+ ASSET_STORE_S3_ACCESS_KEY_ID: z.string().optional(),
+ ASSET_STORE_S3_SECRET_ACCESS_KEY: z.string().optional(),
+ ASSET_STORE_S3_FORCE_PATH_STYLE: stringBool("false"),
});
const serverConfigSchema = allEnv.transform((val) => {
@@ -185,6 +193,19 @@ const serverConfigSchema = allEnv.transform((val) => {
timeoutSec: val.WEBHOOK_TIMEOUT_SEC,
retryTimes: val.WEBHOOK_RETRY_TIMES,
},
+ assetStore: {
+ type: val.ASSET_STORE_S3_ENDPOINT
+ ? ("s3" as const)
+ : ("filesystem" as const),
+ s3: {
+ endpoint: val.ASSET_STORE_S3_ENDPOINT,
+ region: val.ASSET_STORE_S3_REGION,
+ bucket: val.ASSET_STORE_S3_BUCKET,
+ accessKeyId: val.ASSET_STORE_S3_ACCESS_KEY_ID,
+ secretAccessKey: val.ASSET_STORE_S3_SECRET_ACCESS_KEY,
+ forcePathStyle: val.ASSET_STORE_S3_FORCE_PATH_STYLE,
+ },
+ },
};
});
diff --git a/packages/shared/package.json b/packages/shared/package.json
index 691e1d25..f4e521b6 100644
--- a/packages/shared/package.json
+++ b/packages/shared/package.json
@@ -5,6 +5,7 @@
"private": true,
"type": "module",
"dependencies": {
+ "@aws-sdk/client-s3": "^3.842.0",
"glob": "^11.0.0",
"js-tiktoken": "^1.0.20",
"liteque": "^0.3.2",