diff options
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/api/utils/assets.ts | 4 | ||||
| -rw-r--r-- | packages/api/utils/upload.ts | 2 | ||||
| -rw-r--r-- | packages/e2e_tests/docker-compose.yml | 15 | ||||
| -rw-r--r-- | packages/e2e_tests/package.json | 5 | ||||
| -rw-r--r-- | packages/e2e_tests/setup/startContainers.ts | 4 | ||||
| -rw-r--r-- | packages/e2e_tests/tests/assetdb/assetdb-utils.ts | 289 | ||||
| -rw-r--r-- | packages/e2e_tests/tests/assetdb/interface-compliance.test.ts | 627 | ||||
| -rw-r--r-- | packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts | 228 | ||||
| -rw-r--r-- | packages/e2e_tests/tests/assetdb/s3-store.test.ts | 197 | ||||
| -rw-r--r-- | packages/shared/assetdb.ts | 670 | ||||
| -rw-r--r-- | packages/shared/config.ts | 21 | ||||
| -rw-r--r-- | packages/shared/package.json | 1 |
12 files changed, 1964 insertions, 99 deletions
diff --git a/packages/api/utils/assets.ts b/packages/api/utils/assets.ts index d8a726a6..205e1a76 100644 --- a/packages/api/utils/assets.ts +++ b/packages/api/utils/assets.ts @@ -28,7 +28,7 @@ export async function serveAsset(c: Context, assetId: string, userId: string) { const start = parseInt(parts[0], 10); const end = parts[1] ? parseInt(parts[1], 10) : size - 1; - const fStream = createAssetReadStream({ + const fStream = await createAssetReadStream({ userId, assetId, start, @@ -43,7 +43,7 @@ export async function serveAsset(c: Context, assetId: string, userId: string) { await stream.pipe(toWebReadableStream(fStream)); }); } else { - const fStream = createAssetReadStream({ + const fStream = await createAssetReadStream({ userId, assetId, }); diff --git a/packages/api/utils/upload.ts b/packages/api/utils/upload.ts index d96a0f60..daff1fb9 100644 --- a/packages/api/utils/upload.ts +++ b/packages/api/utils/upload.ts @@ -24,7 +24,7 @@ export function webStreamToNode( } export function toWebReadableStream( - nodeStream: fs.ReadStream, + nodeStream: NodeJS.ReadableStream, ): ReadableStream<Uint8Array> { const reader = nodeStream as unknown as Readable; diff --git a/packages/e2e_tests/docker-compose.yml b/packages/e2e_tests/docker-compose.yml index e1fe46bb..64775f46 100644 --- a/packages/e2e_tests/docker-compose.yml +++ b/packages/e2e_tests/docker-compose.yml @@ -35,3 +35,18 @@ services: restart: unless-stopped volumes: - ./setup/html:/usr/share/nginx/html + minio: + image: minio/minio:latest + restart: unless-stopped + ports: + - "9000:9000" + - "9001:9001" + environment: + MINIO_ROOT_USER: minioadmin + MINIO_ROOT_PASSWORD: minioadmin + command: server /data --console-address ":9001" + volumes: + - minio_data:/data + +volumes: + minio_data: diff --git a/packages/e2e_tests/package.json b/packages/e2e_tests/package.json index 3f110838..0cbb8fb3 100644 --- a/packages/e2e_tests/package.json +++ b/packages/e2e_tests/package.json @@ -11,10 +11,13 @@ "lint": "oxlint .", "lint:fix": "oxlint . --fix", "test": "vitest run", - "test:watch": "vitest" + "test:watch": "vitest", + "test:no-build": "E2E_TEST_NO_BUILD=1 vitest run" }, "dependencies": { + "@aws-sdk/client-s3": "^3.842.0", "@karakeep/sdk": "workspace:*", + "@karakeep/shared": "workspace:^0.1.0", "@karakeep/trpc": "workspace:^0.1.0", "superjson": "^2.2.1" }, diff --git a/packages/e2e_tests/setup/startContainers.ts b/packages/e2e_tests/setup/startContainers.ts index 3086d1c8..87e812a2 100644 --- a/packages/e2e_tests/setup/startContainers.ts +++ b/packages/e2e_tests/setup/startContainers.ts @@ -33,8 +33,10 @@ export default async function ({ provide }: GlobalSetupContext) { const __dirname = path.dirname(fileURLToPath(import.meta.url)); const port = await getRandomPort(); + const buildArg = process.env.E2E_TEST_NO_BUILD ? "" : "--build"; + console.log(`Starting docker compose on port ${port}...`); - execSync(`docker compose up --build -d`, { + execSync(`docker compose up ${buildArg} -d`, { cwd: __dirname, stdio: "inherit", env: { diff --git a/packages/e2e_tests/tests/assetdb/assetdb-utils.ts b/packages/e2e_tests/tests/assetdb/assetdb-utils.ts new file mode 100644 index 00000000..a8e29ab4 --- /dev/null +++ b/packages/e2e_tests/tests/assetdb/assetdb-utils.ts @@ -0,0 +1,289 @@ +import * as fs from "fs"; +import * as os from "os"; +import * as path from "path"; +import { + CreateBucketCommand, + DeleteBucketCommand, + DeleteObjectCommand, + ListObjectsV2Command, + S3Client, +} from "@aws-sdk/client-s3"; + +import { + ASSET_TYPES, + AssetMetadata, + AssetStore, + LocalFileSystemAssetStore, + S3AssetStore, +} from "@karakeep/shared/assetdb"; + +export interface TestAssetData { + userId: string; + assetId: string; + content: Buffer; + metadata: AssetMetadata; +} + +export function createTestAssetData( + overrides: Partial<TestAssetData> = {}, +): TestAssetData { + const defaultData: TestAssetData = { + userId: `user_${Math.random().toString(36).substring(7)}`, + assetId: `asset_${Math.random().toString(36).substring(7)}`, + content: Buffer.from(`Test content ${Math.random()}`), + metadata: { + contentType: ASSET_TYPES.TEXT_HTML, + fileName: "test.html", + }, + }; + + return { ...defaultData, ...overrides }; +} + +export function createTestImageData(): TestAssetData { + // Create a minimal PNG image (1x1 pixel) + const pngData = Buffer.from([ + 0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, + 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, + 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, 0x77, 0x53, 0xde, 0x00, 0x00, 0x00, + 0x0c, 0x49, 0x44, 0x41, 0x54, 0x08, 0xd7, 0x63, 0xf8, 0x0f, 0x00, 0x00, + 0x01, 0x00, 0x01, 0x5c, 0xc2, 0x8a, 0x8e, 0x00, 0x00, 0x00, 0x00, 0x49, + 0x45, 0x4e, 0x44, 0xae, 0x42, 0x60, 0x82, + ]); + + return createTestAssetData({ + content: pngData, + metadata: { + contentType: ASSET_TYPES.IMAGE_PNG, + fileName: "test.png", + }, + }); +} + +export function createTestPdfData(): TestAssetData { + // Create a minimal PDF + const pdfContent = `%PDF-1.4 +1 0 obj +<< +/Type /Catalog +/Pages 2 0 R +>> +endobj + +2 0 obj +<< +/Type /Pages +/Kids [3 0 R] +/Count 1 +>> +endobj + +3 0 obj +<< +/Type /Page +/Parent 2 0 R +/MediaBox [0 0 612 792] +>> +endobj + +xref +0 4 +0000000000 65535 f +0000000010 00000 n +0000000053 00000 n +0000000125 00000 n +trailer +<< +/Size 4 +/Root 1 0 R +>> +startxref +173 +%%EOF`; + + return createTestAssetData({ + content: Buffer.from(pdfContent), + metadata: { + contentType: ASSET_TYPES.APPLICATION_PDF, + fileName: "test.pdf", + }, + }); +} + +export async function createTempDirectory(): Promise<string> { + const tempDir = await fs.promises.mkdtemp( + path.join(os.tmpdir(), "assetdb-test-"), + ); + return tempDir; +} + +export async function cleanupTempDirectory(tempDir: string): Promise<void> { + try { + await fs.promises.rm(tempDir, { recursive: true, force: true }); + } catch (error) { + console.warn(`Failed to cleanup temp directory ${tempDir}:`, error); + } +} + +export function createLocalFileSystemStore( + rootPath: string, +): LocalFileSystemAssetStore { + return new LocalFileSystemAssetStore(rootPath); +} + +export function createS3Store(bucketName: string): S3AssetStore { + const s3Client = new S3Client({ + region: "us-east-1", + endpoint: "http://localhost:9000", // MinIO endpoint for testing + credentials: { + accessKeyId: "minioadmin", + secretAccessKey: "minioadmin", + }, + forcePathStyle: true, + }); + + return new S3AssetStore(s3Client, bucketName); +} + +export async function createTestBucket(bucketName: string): Promise<S3Client> { + const s3Client = new S3Client({ + region: "us-east-1", + endpoint: "http://localhost:9000", + credentials: { + accessKeyId: "minioadmin", + secretAccessKey: "minioadmin", + }, + forcePathStyle: true, + }); + + try { + await s3Client.send(new CreateBucketCommand({ Bucket: bucketName })); + } catch (error: unknown) { + const err = error as { name?: string }; + if ( + err.name !== "BucketAlreadyOwnedByYou" && + err.name !== "BucketAlreadyExists" + ) { + throw error; + } + } + + return s3Client; +} + +export async function cleanupTestBucket( + s3Client: S3Client, + bucketName: string, +): Promise<void> { + try { + // List and delete all objects + let continuationToken: string | undefined; + do { + const listResponse = await s3Client.send( + new ListObjectsV2Command({ + Bucket: bucketName, + ContinuationToken: continuationToken, + }), + ); + + if (listResponse.Contents && listResponse.Contents.length > 0) { + const deletePromises = listResponse.Contents.map( + (obj: { Key?: string }) => + s3Client.send( + new DeleteObjectCommand({ + Bucket: bucketName, + Key: obj.Key!, + }), + ), + ); + await Promise.all(deletePromises); + } + + continuationToken = listResponse.NextContinuationToken; + } while (continuationToken); + + // Delete the bucket + await s3Client.send(new DeleteBucketCommand({ Bucket: bucketName })); + } catch (error) { + console.warn(`Failed to cleanup S3 bucket ${bucketName}:`, error); + } +} + +export async function createTempFile( + content: Buffer, + fileName: string, +): Promise<string> { + const tempDir = await createTempDirectory(); + const filePath = path.join(tempDir, fileName); + await fs.promises.writeFile(filePath, content); + return filePath; +} + +export async function streamToBuffer( + stream: NodeJS.ReadableStream, +): Promise<Buffer> { + const chunks: Buffer[] = []; + const readable = stream as AsyncIterable<Buffer>; + + for await (const chunk of readable) { + chunks.push(chunk); + } + + return Buffer.concat(chunks); +} + +export function generateLargeBuffer(sizeInMB: number): Buffer { + const sizeInBytes = sizeInMB * 1024 * 1024; + const buffer = Buffer.alloc(sizeInBytes); + + // Fill with some pattern to make it compressible but not empty + for (let i = 0; i < sizeInBytes; i++) { + buffer[i] = i % 256; + } + + return buffer; +} + +export async function assertAssetExists( + store: AssetStore, + userId: string, + assetId: string, +): Promise<void> { + const { asset, metadata } = await store.readAsset({ userId, assetId }); + if (!asset || !metadata) { + throw new Error(`Asset ${assetId} for user ${userId} does not exist`); + } +} + +export async function assertAssetNotExists( + store: AssetStore, + userId: string, + assetId: string, +): Promise<void> { + try { + await store.readAsset({ userId, assetId }); + throw new Error(`Asset ${assetId} for user ${userId} should not exist`); + } catch (error: unknown) { + // Expected to throw + const err = error as { message?: string }; + if (err.message?.includes("should not exist")) { + throw error; + } + } +} + +export async function getAllAssetsArray(store: AssetStore): Promise< + { + userId: string; + assetId: string; + contentType: string; + fileName?: string | null; + size: number; + }[] +> { + const assets = []; + for await (const asset of store.getAllAssets()) { + assets.push(asset); + } + return assets; +} diff --git a/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts b/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts new file mode 100644 index 00000000..d5288c7a --- /dev/null +++ b/packages/e2e_tests/tests/assetdb/interface-compliance.test.ts @@ -0,0 +1,627 @@ +import * as fs from "fs"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { ASSET_TYPES, AssetStore } from "@karakeep/shared/assetdb"; + +import { + assertAssetExists, + assertAssetNotExists, + cleanupTempDirectory, + cleanupTestBucket, + createLocalFileSystemStore, + createS3Store, + createTempDirectory, + createTempFile, + createTestAssetData, + createTestBucket, + createTestImageData, + createTestPdfData, + generateLargeBuffer, + getAllAssetsArray, + streamToBuffer, +} from "./assetdb-utils"; + +interface TestContext { + store: AssetStore; + cleanup: () => Promise<void>; +} + +async function createLocalContext(): Promise<TestContext> { + const tempDir = await createTempDirectory(); + const store = createLocalFileSystemStore(tempDir); + + return { + store, + cleanup: async () => { + await cleanupTempDirectory(tempDir); + }, + }; +} + +async function createS3Context(): Promise<TestContext> { + const bucketName = `test-bucket-${Math.random().toString(36).substring(7)}`; + const s3Client = await createTestBucket(bucketName); + const store = createS3Store(bucketName); + + return { + store, + cleanup: async () => { + await cleanupTestBucket(s3Client, bucketName); + }, + }; +} + +describe.each([ + { name: "LocalFileSystemAssetStore", createContext: createLocalContext }, + { name: "S3AssetStore", createContext: createS3Context }, +])("AssetStore Interface Compliance - $name", ({ createContext }) => { + let context: TestContext; + + beforeEach(async () => { + context = await createContext(); + }); + + afterEach(async () => { + await context.cleanup(); + }); + + describe("Basic CRUD Operations", () => { + it("should save and retrieve an asset", async () => { + const testData = createTestAssetData(); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const { asset, metadata } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(asset).toEqual(testData.content); + expect(metadata).toEqual(testData.metadata); + }); + + it("should delete an asset", async () => { + const testData = createTestAssetData(); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + await assertAssetExists(context.store, testData.userId, testData.assetId); + + await context.store.deleteAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + await assertAssetNotExists( + context.store, + testData.userId, + testData.assetId, + ); + }); + + it("should get asset size", async () => { + const testData = createTestAssetData(); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const size = await context.store.getAssetSize({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(size).toBe(testData.content.length); + }); + + it("should read asset metadata", async () => { + const testData = createTestAssetData(); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const metadata = await context.store.readAssetMetadata({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(metadata).toEqual(testData.metadata); + }); + }); + + describe("Streaming Operations", () => { + it("should create readable stream", async () => { + const testData = createTestAssetData(); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const stream = await context.store.createAssetReadStream({ + userId: testData.userId, + assetId: testData.assetId, + }); + + const streamedContent = await streamToBuffer(stream); + expect(streamedContent).toEqual(testData.content); + }); + + it("should support range requests in streams", async () => { + const content = Buffer.from("0123456789abcdef"); + const testData = createTestAssetData({ content }); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const stream = await context.store.createAssetReadStream({ + userId: testData.userId, + assetId: testData.assetId, + start: 5, + end: 10, + }); + + const streamedContent = await streamToBuffer(stream); + expect(streamedContent.toString()).toBe("56789a"); + }); + }); + + describe("Asset Types Support", () => { + it("should support all required asset types", async () => { + const testCases = [ + { contentType: ASSET_TYPES.IMAGE_JPEG, fileName: "test.jpg" }, + { contentType: ASSET_TYPES.IMAGE_PNG, fileName: "test.png" }, + { contentType: ASSET_TYPES.IMAGE_WEBP, fileName: "test.webp" }, + { contentType: ASSET_TYPES.APPLICATION_PDF, fileName: "test.pdf" }, + { contentType: ASSET_TYPES.TEXT_HTML, fileName: "test.html" }, + { contentType: ASSET_TYPES.VIDEO_MP4, fileName: "test.mp4" }, + ]; + + for (const { contentType, fileName } of testCases) { + const testData = createTestAssetData({ + metadata: { contentType, fileName }, + }); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const { metadata } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(metadata.contentType).toBe(contentType); + expect(metadata.fileName).toBe(fileName); + } + }); + + it("should handle large assets", async () => { + const largeContent = generateLargeBuffer(5); // 5MB + const testData = createTestAssetData({ content: largeContent }); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const { asset } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(asset.length).toBe(largeContent.length); + expect(asset).toEqual(largeContent); + }); + + it("should reject unsupported asset types", async () => { + const testData = createTestAssetData({ + metadata: { + contentType: "unsupported/type", + fileName: "test.unsupported", + }, + }); + + await expect( + context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }), + ).rejects.toThrow("Unsupported asset type"); + }); + }); + + describe("Bulk Operations", () => { + it("should delete all user assets", async () => { + const userId = "bulk-test-user"; + const testAssets = [ + createTestAssetData({ userId }), + createTestAssetData({ userId }), + createTestAssetData({ userId }), + ]; + const otherUserAsset = createTestAssetData(); // Different user + + // Save all assets + await Promise.all([ + ...testAssets.map((testData) => + context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }), + ), + context.store.saveAsset({ + userId: otherUserAsset.userId, + assetId: otherUserAsset.assetId, + asset: otherUserAsset.content, + metadata: otherUserAsset.metadata, + }), + ]); + + // Delete user assets + await context.store.deleteUserAssets({ userId }); + + // Verify user assets are deleted + for (const testData of testAssets) { + await assertAssetNotExists( + context.store, + testData.userId, + testData.assetId, + ); + } + + // Verify other user's asset still exists + await assertAssetExists( + context.store, + otherUserAsset.userId, + otherUserAsset.assetId, + ); + }); + + it("should list all assets", async () => { + const testAssets = [ + createTestAssetData(), + createTestImageData(), + createTestPdfData(), + ]; + + // Save all assets + await Promise.all( + testAssets.map((testData) => + context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }), + ), + ); + + const assets = await getAllAssetsArray(context.store); + + expect(assets).toHaveLength(3); + + // Verify all assets are present + const assetIds = assets.map((a) => a.assetId); + for (const testData of testAssets) { + expect(assetIds).toContain(testData.assetId); + } + + // Verify asset structure + for (const asset of assets) { + expect(asset).toHaveProperty("userId"); + expect(asset).toHaveProperty("assetId"); + expect(asset).toHaveProperty("contentType"); + expect(asset).toHaveProperty("size"); + expect(typeof asset.size).toBe("number"); + expect(asset.size).toBeGreaterThan(0); + } + }); + }); + + describe("File Operations", () => { + it("should save asset from file and delete original file", async () => { + const testData = createTestAssetData(); + const tempFile = await createTempFile(testData.content, "temp-asset.bin"); + + // Verify temp file exists before operation + expect( + await fs.promises + .access(tempFile) + .then(() => true) + .catch(() => false), + ).toBe(true); + + await context.store.saveAssetFromFile({ + userId: testData.userId, + assetId: testData.assetId, + assetPath: tempFile, + metadata: testData.metadata, + }); + + // Verify temp file was deleted + expect( + await fs.promises + .access(tempFile) + .then(() => true) + .catch(() => false), + ).toBe(false); + + // Verify asset was saved correctly + const { asset, metadata } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(asset).toEqual(testData.content); + expect(metadata).toEqual(testData.metadata); + }); + }); + + describe("Error Handling", () => { + it("should throw error for non-existent asset read", async () => { + await expect( + context.store.readAsset({ + userId: "non-existent-user", + assetId: "non-existent-asset", + }), + ).rejects.toThrow(); + }); + + it("should throw error for non-existent asset metadata", async () => { + await expect( + context.store.readAssetMetadata({ + userId: "non-existent-user", + assetId: "non-existent-asset", + }), + ).rejects.toThrow(); + }); + + it("should throw error for non-existent asset size", async () => { + await expect( + context.store.getAssetSize({ + userId: "non-existent-user", + assetId: "non-existent-asset", + }), + ).rejects.toThrow(); + }); + + it("should handle deleting non-existent asset gracefully", async () => { + // Filesystem implementation throws errors for non-existent files + await expect( + context.store.deleteAsset({ + userId: "non-existent-user", + assetId: "non-existent-asset", + }), + ).resolves.not.toThrow(); + }); + + it("should handle deletion of non-existent user directory gracefully", async () => { + // Should not throw error when user directory doesn't exist + await expect( + context.store.deleteUserAssets({ userId: "non-existent-user" }), + ).resolves.not.toThrow(); + }); + + it("should handle non-existent asset stream appropriately", async () => { + const streamResult = context.store.createAssetReadStream({ + userId: "non-existent-user", + assetId: "non-existent-asset", + }); + + await expect(streamResult).rejects.toThrow(); + }); + }); + + describe("Data Integrity", () => { + it("should preserve binary data integrity", async () => { + const testData = createTestImageData(); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const { asset } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + // Verify exact binary match + expect(asset).toEqual(testData.content); + + // Verify PNG header is intact + expect(asset[0]).toBe(0x89); + expect(asset[1]).toBe(0x50); + expect(asset[2]).toBe(0x4e); + expect(asset[3]).toBe(0x47); + }); + + it("should preserve metadata exactly", async () => { + const testData = createTestAssetData({ + metadata: { + contentType: ASSET_TYPES.APPLICATION_PDF, + fileName: "test-document.pdf", + }, + }); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const { metadata } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(metadata).toEqual(testData.metadata); + expect(metadata.contentType).toBe(ASSET_TYPES.APPLICATION_PDF); + expect(metadata.fileName).toBe("test-document.pdf"); + }); + + it("should handle null fileName correctly", async () => { + const testData = createTestAssetData({ + metadata: { + contentType: ASSET_TYPES.TEXT_HTML, + fileName: null, + }, + }); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const { metadata } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(metadata.fileName).toBeNull(); + }); + }); + + describe("Concurrent Operations", () => { + it("should handle concurrent saves safely", async () => { + const testAssets = Array.from({ length: 5 }, () => createTestAssetData()); + + await Promise.all( + testAssets.map((testData) => + context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }), + ), + ); + + // Verify all assets were saved correctly + for (const testData of testAssets) { + const { asset, metadata } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(asset).toEqual(testData.content); + expect(metadata).toEqual(testData.metadata); + } + }); + + it("should handle concurrent reads safely", async () => { + const testData = createTestAssetData(); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + // Perform multiple concurrent reads + const readPromises = Array.from({ length: 10 }, () => + context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }), + ); + + const results = await Promise.all(readPromises); + + // Verify all reads returned the same data + for (const { asset, metadata } of results) { + expect(asset).toEqual(testData.content); + expect(metadata).toEqual(testData.metadata); + } + }); + }); + + describe("Edge Cases", () => { + it("should handle empty assets", async () => { + const testData = createTestAssetData({ + content: Buffer.alloc(0), + }); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const { asset } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(asset.length).toBe(0); + + const size = await context.store.getAssetSize({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(size).toBe(0); + }); + + it("should handle special characters in user and asset IDs", async () => { + const testData = createTestAssetData({ + userId: "user-with-special_chars.123", + assetId: "asset_with-special.chars_456", + }); + + await context.store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const { asset, metadata } = await context.store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(asset).toEqual(testData.content); + expect(metadata).toEqual(testData.metadata); + }); + }); +}); diff --git a/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts b/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts new file mode 100644 index 00000000..36ff837f --- /dev/null +++ b/packages/e2e_tests/tests/assetdb/local-filesystem-store.test.ts @@ -0,0 +1,228 @@ +import * as fs from "fs"; +import * as path from "path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { LocalFileSystemAssetStore } from "@karakeep/shared/assetdb"; + +import { + assertAssetNotExists, + cleanupTempDirectory, + createLocalFileSystemStore, + createTempDirectory, + createTestAssetData, +} from "./assetdb-utils"; + +describe("LocalFileSystemAssetStore - Filesystem-Specific Behaviors", () => { + let tempDir: string; + let store: LocalFileSystemAssetStore; + + beforeEach(async () => { + tempDir = await createTempDirectory(); + store = createLocalFileSystemStore(tempDir); + }); + + afterEach(async () => { + await cleanupTempDirectory(tempDir); + }); + + describe("File System Structure", () => { + it("should create correct directory structure and files", async () => { + const testData = createTestAssetData(); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + // Verify directory structure + const assetDir = path.join(tempDir, testData.userId, testData.assetId); + expect( + await fs.promises + .access(assetDir) + .then(() => true) + .catch(() => false), + ).toBe(true); + + // Verify asset.bin file + const assetFile = path.join(assetDir, "asset.bin"); + expect( + await fs.promises + .access(assetFile) + .then(() => true) + .catch(() => false), + ).toBe(true); + + // Verify metadata.json file + const metadataFile = path.join(assetDir, "metadata.json"); + expect( + await fs.promises + .access(metadataFile) + .then(() => true) + .catch(() => false), + ).toBe(true); + + // Verify file contents + const savedContent = await fs.promises.readFile(assetFile); + expect(savedContent).toEqual(testData.content); + + const savedMetadata = JSON.parse( + await fs.promises.readFile(metadataFile, "utf8"), + ); + expect(savedMetadata).toEqual(testData.metadata); + }); + + it("should create nested directory structure for user/asset hierarchy", async () => { + const userId = "user123"; + const assetId = "asset456"; + const testData = createTestAssetData({ userId, assetId }); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + // Verify the exact directory structure + const userDir = path.join(tempDir, userId); + const assetDir = path.join(userDir, assetId); + + expect( + await fs.promises + .access(userDir) + .then(() => true) + .catch(() => false), + ).toBe(true); + + expect( + await fs.promises + .access(assetDir) + .then(() => true) + .catch(() => false), + ).toBe(true); + + // Verify files exist in the correct location + expect( + await fs.promises + .access(path.join(assetDir, "asset.bin")) + .then(() => true) + .catch(() => false), + ).toBe(true); + + expect( + await fs.promises + .access(path.join(assetDir, "metadata.json")) + .then(() => true) + .catch(() => false), + ).toBe(true); + }); + }); + + describe("Directory Cleanup", () => { + it("should remove entire asset directory when deleting asset", async () => { + const testData = createTestAssetData(); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const assetDir = path.join(tempDir, testData.userId, testData.assetId); + + // Verify directory exists + expect( + await fs.promises + .access(assetDir) + .then(() => true) + .catch(() => false), + ).toBe(true); + + await store.deleteAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + // Verify entire directory was removed + expect( + await fs.promises + .access(assetDir) + .then(() => true) + .catch(() => false), + ).toBe(false); + + await assertAssetNotExists(store, testData.userId, testData.assetId); + }); + + it("should remove entire user directory when deleting all user assets", async () => { + const userId = "test-user"; + const testData1 = createTestAssetData({ userId }); + const testData2 = createTestAssetData({ userId }); + + await Promise.all([ + store.saveAsset({ + userId: testData1.userId, + assetId: testData1.assetId, + asset: testData1.content, + metadata: testData1.metadata, + }), + store.saveAsset({ + userId: testData2.userId, + assetId: testData2.assetId, + asset: testData2.content, + metadata: testData2.metadata, + }), + ]); + + const userDir = path.join(tempDir, userId); + + // Verify user directory exists + expect( + await fs.promises + .access(userDir) + .then(() => true) + .catch(() => false), + ).toBe(true); + + await store.deleteUserAssets({ userId }); + + // Verify entire user directory was removed + expect( + await fs.promises + .access(userDir) + .then(() => true) + .catch(() => false), + ).toBe(false); + }); + }); + + describe("File System Permissions", () => { + it("should create directories with appropriate permissions", async () => { + const testData = createTestAssetData(); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const userDir = path.join(tempDir, testData.userId); + const assetDir = path.join(userDir, testData.assetId); + + // Verify directories are readable and writable + const userStats = await fs.promises.stat(userDir); + const assetStats = await fs.promises.stat(assetDir); + + expect(userStats.isDirectory()).toBe(true); + expect(assetStats.isDirectory()).toBe(true); + + // Verify we can read and write to the directories + await fs.promises.access(userDir, fs.constants.R_OK | fs.constants.W_OK); + await fs.promises.access(assetDir, fs.constants.R_OK | fs.constants.W_OK); + }); + }); +}); diff --git a/packages/e2e_tests/tests/assetdb/s3-store.test.ts b/packages/e2e_tests/tests/assetdb/s3-store.test.ts new file mode 100644 index 00000000..c573750e --- /dev/null +++ b/packages/e2e_tests/tests/assetdb/s3-store.test.ts @@ -0,0 +1,197 @@ +import { HeadObjectCommand, S3Client } from "@aws-sdk/client-s3"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; + +import { S3AssetStore } from "@karakeep/shared/assetdb"; + +import { + assertAssetExists, + cleanupTestBucket, + createS3Store, + createTestAssetData, + createTestBucket, +} from "./assetdb-utils"; + +describe("S3AssetStore - S3-Specific Behaviors", () => { + let bucketName: string; + let s3Client: S3Client; + let store: S3AssetStore; + + beforeEach(async () => { + bucketName = `test-bucket-${Math.random().toString(36).substring(7)}`; + s3Client = await createTestBucket(bucketName); + store = createS3Store(bucketName); + }); + + afterEach(async () => { + await cleanupTestBucket(s3Client, bucketName); + }); + + describe("S3 Object Structure", () => { + it("should store asset as single object with user-defined metadata", async () => { + const testData = createTestAssetData(); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + // Verify the object exists with the expected key structure + const objectKey = `${testData.userId}/${testData.assetId}`; + const headResponse = await s3Client.send( + new HeadObjectCommand({ + Bucket: bucketName, + Key: objectKey, + }), + ); + + // Verify metadata is stored in S3 user-defined metadata + expect(headResponse.Metadata).toBeDefined(); + expect(headResponse.Metadata!["x-amz-meta-content-type"]).toBe( + testData.metadata.contentType, + ); + expect(headResponse.Metadata!["x-amz-meta-file-name"]).toBe( + testData.metadata.fileName || "", + ); + + // Verify content type is set correctly on the S3 object + expect(headResponse.ContentType).toBe(testData.metadata.contentType); + }); + + it("should handle null fileName in metadata correctly", async () => { + const testData = createTestAssetData({ + metadata: { + contentType: "text/html", + fileName: null, + }, + }); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + const objectKey = `${testData.userId}/${testData.assetId}`; + const headResponse = await s3Client.send( + new HeadObjectCommand({ + Bucket: bucketName, + Key: objectKey, + }), + ); + + // Verify null fileName are not stored in S3 metadata + expect(headResponse.Metadata!["x-amz-meta-file-name"]).toBeUndefined(); + + // Verify reading back gives us null fileName + const metadata = await store.readAssetMetadata({ + userId: testData.userId, + assetId: testData.assetId, + }); + expect(metadata.fileName).toBeNull(); + }); + }); + + describe("S3 Key Structure", () => { + it("should use clean userId/assetId key structure", async () => { + const userId = "user123"; + const assetId = "asset456"; + const testData = createTestAssetData({ userId, assetId }); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + // Verify the exact key structure + const expectedKey = `${userId}/${assetId}`; + const headResponse = await s3Client.send( + new HeadObjectCommand({ + Bucket: bucketName, + Key: expectedKey, + }), + ); + + expect(headResponse.ContentLength).toBe(testData.content.length); + }); + + it("should handle special characters in user and asset IDs", async () => { + const userId = "user/with/slashes"; + const assetId = "asset-with-special-chars_123"; + const testData = createTestAssetData({ userId, assetId }); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + await assertAssetExists(store, testData.userId, testData.assetId); + }); + }); + + describe("S3 Eventual Consistency", () => { + it("should handle immediate read after write (MinIO strong consistency)", async () => { + const testData = createTestAssetData(); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + // Immediately try to read - should work with MinIO's strong consistency + const { asset, metadata } = await store.readAsset({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(asset).toEqual(testData.content); + expect(metadata).toEqual(testData.metadata); + }); + }); + + describe("S3 Metadata Conversion", () => { + it("should correctly convert between AssetMetadata and S3 metadata", async () => { + const testCases = [ + { + contentType: "image/jpeg", + fileName: "test-image.jpg", + }, + { + contentType: "application/pdf", + fileName: "document.pdf", + }, + { + contentType: "text/html", + fileName: null, + }, + ]; + + for (const metadata of testCases) { + const testData = createTestAssetData({ metadata }); + + await store.saveAsset({ + userId: testData.userId, + assetId: testData.assetId, + asset: testData.content, + metadata: testData.metadata, + }); + + // Verify metadata round-trip + const retrievedMetadata = await store.readAssetMetadata({ + userId: testData.userId, + assetId: testData.assetId, + }); + + expect(retrievedMetadata).toEqual(testData.metadata); + } + }); + }); +}); diff --git a/packages/shared/assetdb.ts b/packages/shared/assetdb.ts index 3ac92067..77050406 100644 --- a/packages/shared/assetdb.ts +++ b/packages/shared/assetdb.ts @@ -1,5 +1,16 @@ import * as fs from "fs"; import * as path from "path"; +import { Readable } from "stream"; +import { + _Object, + DeleteObjectCommand, + DeleteObjectsCommand, + GetObjectCommand, + HeadObjectCommand, + ListObjectsV2Command, + PutObjectCommand, + S3Client, +} from "@aws-sdk/client-s3"; import { Glob } from "glob"; import { z } from "zod"; @@ -43,19 +54,563 @@ export const SUPPORTED_ASSET_TYPES: Set<string> = new Set<string>([ ASSET_TYPES.VIDEO_MP4, ]); -function getAssetDir(userId: string, assetId: string) { - return path.join(ROOT_PATH, userId, assetId); -} - export const zAssetMetadataSchema = z.object({ contentType: z.string(), fileName: z.string().nullish(), }); +export type AssetMetadata = z.infer<typeof zAssetMetadataSchema>; + +export interface AssetInfo { + userId: string; + assetId: string; + contentType: string; + fileName?: string | null; + size: number; +} + +export interface AssetStore { + saveAsset(params: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }): Promise<void>; + + saveAssetFromFile(params: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }): Promise<void>; + + readAsset(params: { + userId: string; + assetId: string; + }): Promise<{ asset: Buffer; metadata: AssetMetadata }>; + + createAssetReadStream(params: { + userId: string; + assetId: string; + start?: number; + end?: number; + }): Promise<NodeJS.ReadableStream>; + + readAssetMetadata(params: { + userId: string; + assetId: string; + }): Promise<AssetMetadata>; + + getAssetSize(params: { userId: string; assetId: string }): Promise<number>; + + deleteAsset(params: { userId: string; assetId: string }): Promise<void>; + + deleteUserAssets(params: { userId: string }): Promise<void>; + + getAllAssets(): AsyncGenerator<AssetInfo>; +} + export function newAssetId() { return crypto.randomUUID(); } +class LocalFileSystemAssetStore implements AssetStore { + private rootPath: string; + + constructor(rootPath: string) { + this.rootPath = rootPath; + } + + private getAssetDir(userId: string, assetId: string) { + return path.join(this.rootPath, userId, assetId); + } + + private async isPathExists(filePath: string) { + return fs.promises + .access(filePath) + .then(() => true) + .catch(() => false); + } + + async saveAsset({ + userId, + assetId, + asset, + metadata, + }: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + const assetDir = this.getAssetDir(userId, assetId); + await fs.promises.mkdir(assetDir, { recursive: true }); + + await Promise.all([ + fs.promises.writeFile( + path.join(assetDir, "asset.bin"), + Uint8Array.from(asset), + ), + fs.promises.writeFile( + path.join(assetDir, "metadata.json"), + JSON.stringify(metadata), + ), + ]); + } + + async saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + const assetDir = this.getAssetDir(userId, assetId); + await fs.promises.mkdir(assetDir, { recursive: true }); + + await Promise.all([ + fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")), + fs.promises.writeFile( + path.join(assetDir, "metadata.json"), + JSON.stringify(metadata), + ), + ]); + await fs.promises.rm(assetPath); + } + + async readAsset({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + + const [asset, metadataStr] = await Promise.all([ + fs.promises.readFile(path.join(assetDir, "asset.bin")), + fs.promises.readFile(path.join(assetDir, "metadata.json"), { + encoding: "utf8", + }), + ]); + + const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr)); + return { asset, metadata }; + } + + async createAssetReadStream({ + userId, + assetId, + start, + end, + }: { + userId: string; + assetId: string; + start?: number; + end?: number; + }) { + const assetDir = this.getAssetDir(userId, assetId); + const assetPath = path.join(assetDir, "asset.bin"); + if (!(await this.isPathExists(assetPath))) { + throw new Error(`Asset ${assetId} not found`); + } + + return fs.createReadStream(path.join(assetDir, "asset.bin"), { + start, + end, + }); + } + + async readAssetMetadata({ + userId, + assetId, + }: { + userId: string; + assetId: string; + }) { + const assetDir = this.getAssetDir(userId, assetId); + + const metadataStr = await fs.promises.readFile( + path.join(assetDir, "metadata.json"), + { + encoding: "utf8", + }, + ); + + return zAssetMetadataSchema.parse(JSON.parse(metadataStr)); + } + + async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + const stat = await fs.promises.stat(path.join(assetDir, "asset.bin")); + return stat.size; + } + + async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) { + const assetDir = this.getAssetDir(userId, assetId); + if (!(await this.isPathExists(assetDir))) { + return; + } + await fs.promises.rm(assetDir, { recursive: true }); + } + + async deleteUserAssets({ userId }: { userId: string }) { + const userDir = path.join(this.rootPath, userId); + const dirExists = await this.isPathExists(userDir); + if (!dirExists) { + return; + } + await fs.promises.rm(userDir, { recursive: true }); + } + + async *getAllAssets() { + const g = new Glob(`/**/**/asset.bin`, { + maxDepth: 3, + root: this.rootPath, + cwd: this.rootPath, + absolute: false, + }); + for await (const file of g) { + const [userId, assetId] = file.split("/").slice(0, 2); + const [size, metadata] = await Promise.all([ + this.getAssetSize({ userId, assetId }), + this.readAssetMetadata({ userId, assetId }), + ]); + yield { + userId, + assetId, + ...metadata, + size, + }; + } + } +} + +class S3AssetStore implements AssetStore { + private s3Client: S3Client; + private bucketName: string; + + constructor(s3Client: S3Client, bucketName: string) { + this.s3Client = s3Client; + this.bucketName = bucketName; + } + + private getAssetKey(userId: string, assetId: string) { + return `${userId}/${assetId}`; + } + + private metadataToS3Metadata( + metadata: AssetMetadata, + ): Record<string, string> { + return { + ...(metadata.fileName + ? { "x-amz-meta-file-name": metadata.fileName } + : {}), + "x-amz-meta-content-type": metadata.contentType, + }; + } + + private s3MetadataToMetadata( + s3Metadata: Record<string, string> | undefined, + ): AssetMetadata { + if (!s3Metadata) { + throw new Error("No metadata found in S3 object"); + } + + return { + contentType: s3Metadata["x-amz-meta-content-type"] || "", + fileName: s3Metadata["x-amz-meta-file-name"] ?? null, + }; + } + + async saveAsset({ + userId, + assetId, + asset, + metadata, + }: { + userId: string; + assetId: string; + asset: Buffer; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + + await this.s3Client.send( + new PutObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Body: asset, + ContentType: metadata.contentType, + Metadata: this.metadataToS3Metadata(metadata), + }), + ); + } + + async saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }: { + userId: string; + assetId: string; + assetPath: string; + metadata: AssetMetadata; + }) { + if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { + throw new Error("Unsupported asset type"); + } + + const asset = fs.createReadStream(assetPath); + await this.s3Client.send( + new PutObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Body: asset, + ContentType: metadata.contentType, + Metadata: this.metadataToS3Metadata(metadata), + }), + ); + await fs.promises.rm(assetPath); + } + + async readAsset({ userId, assetId }: { userId: string; assetId: string }) { + const response = await this.s3Client.send( + new GetObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + if (!response.Body) { + throw new Error("Asset not found"); + } + + const assetBuffer = await this.streamToBuffer(response.Body as Readable); + const metadata = this.s3MetadataToMetadata(response.Metadata); + + return { asset: assetBuffer, metadata }; + } + + async createAssetReadStream({ + userId, + assetId, + start, + end, + }: { + userId: string; + assetId: string; + start?: number; + end?: number; + }) { + const range = + start !== undefined && end !== undefined + ? `bytes=${start}-${end}` + : undefined; + + const command = new GetObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + Range: range, + }); + + const response = await this.s3Client.send(command); + if (!response.Body) { + throw new Error("Asset not found"); + } + return response.Body as NodeJS.ReadableStream; + } + + async readAssetMetadata({ + userId, + assetId, + }: { + userId: string; + assetId: string; + }) { + const response = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + return this.s3MetadataToMetadata(response.Metadata); + } + + async getAssetSize({ userId, assetId }: { userId: string; assetId: string }) { + const response = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + + return response.ContentLength || 0; + } + + async deleteAsset({ userId, assetId }: { userId: string; assetId: string }) { + await this.s3Client.send( + new DeleteObjectCommand({ + Bucket: this.bucketName, + Key: this.getAssetKey(userId, assetId), + }), + ); + } + + async deleteUserAssets({ userId }: { userId: string }) { + let continuationToken: string | undefined; + + do { + const listResponse = await this.s3Client.send( + new ListObjectsV2Command({ + Bucket: this.bucketName, + Prefix: `${userId}/`, + ContinuationToken: continuationToken, + }), + ); + + if (listResponse.Contents && listResponse.Contents.length > 0) { + await this.s3Client.send( + new DeleteObjectsCommand({ + Bucket: this.bucketName, + Delete: { + Objects: listResponse.Contents.map((obj) => ({ + Key: obj.Key!, + })), + }, + }), + ); + } + + continuationToken = listResponse.NextContinuationToken; + } while (continuationToken); + } + + async *getAllAssets() { + let continuationToken: string | undefined; + + do { + const listResponse = await this.s3Client.send( + new ListObjectsV2Command({ + Bucket: this.bucketName, + ContinuationToken: continuationToken, + }), + ); + + if (listResponse.Contents) { + for (const obj of listResponse.Contents) { + if (!obj.Key) continue; + + const pathParts = obj.Key.split("/"); + if (pathParts.length === 2) { + const userId = pathParts[0]; + const assetId = pathParts[1]; + + try { + const headResponse = await this.s3Client.send( + new HeadObjectCommand({ + Bucket: this.bucketName, + Key: obj.Key, + }), + ); + + const metadata = this.s3MetadataToMetadata(headResponse.Metadata); + const size = headResponse.ContentLength || 0; + + yield { + userId, + assetId, + ...metadata, + size, + }; + } catch (error) { + logger.warn(`Failed to read asset ${userId}/${assetId}:`, error); + } + } + } + } + + continuationToken = listResponse.NextContinuationToken; + } while (continuationToken); + } + + private async streamToBuffer(stream: Readable): Promise<Buffer> { + const chunks: Buffer[] = []; + for await (const chunk of stream) { + chunks.push(chunk); + } + return Buffer.concat(chunks); + } +} + +function createDefaultAssetStore(): AssetStore { + const config = serverConfig.assetStore; + + if (config.type === "s3") { + if (!config.s3.bucket) { + throw new Error( + "ASSET_STORE_S3_BUCKET is required when using S3 asset store", + ); + } + if (!config.s3.accessKeyId || !config.s3.secretAccessKey) { + throw new Error( + "ASSET_STORE_S3_ACCESS_KEY_ID and ASSET_STORE_S3_SECRET_ACCESS_KEY are required when using S3 asset store", + ); + } + + const s3Client = new S3Client({ + region: config.s3.region, + endpoint: config.s3.endpoint, + forcePathStyle: config.s3.forcePathStyle, + credentials: { + accessKeyId: config.s3.accessKeyId, + secretAccessKey: config.s3.secretAccessKey, + }, + }); + + return new S3AssetStore(s3Client, config.s3.bucket); + } + + return new LocalFileSystemAssetStore(ROOT_PATH); +} + +const defaultAssetStore = createDefaultAssetStore(); + +export { LocalFileSystemAssetStore, S3AssetStore }; + +/** + * Example usage of S3AssetStore: + * + * import { S3Client } from "@aws-sdk/client-s3"; + * import { S3AssetStore } from "@karakeep/shared/assetdb"; + * + * const s3Client = new S3Client({ + * region: "us-east-1", + * credentials: { + * accessKeyId: "your-access-key", + * secretAccessKey: "your-secret-key" + * } + * }); + * + * const s3AssetStore = new S3AssetStore(s3Client, "your-bucket-name"); + * + * // Use s3AssetStore instead of the default file system store + * await s3AssetStore.saveAsset({ + * userId: "user123", + * assetId: "asset456", + * asset: buffer, + * metadata: { contentType: "image/jpeg", fileName: "photo.jpg" } + * }); + */ + export async function saveAsset({ userId, assetId, @@ -67,22 +622,7 @@ export async function saveAsset({ asset: Buffer; metadata: z.infer<typeof zAssetMetadataSchema>; }) { - if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { - throw new Error("Unsupported asset type"); - } - const assetDir = getAssetDir(userId, assetId); - await fs.promises.mkdir(assetDir, { recursive: true }); - - await Promise.all([ - fs.promises.writeFile( - path.join(assetDir, "asset.bin"), - Uint8Array.from(asset), - ), - fs.promises.writeFile( - path.join(assetDir, "metadata.json"), - JSON.stringify(metadata), - ), - ]); + return defaultAssetStore.saveAsset({ userId, assetId, asset, metadata }); } export async function saveAssetFromFile({ @@ -96,22 +636,12 @@ export async function saveAssetFromFile({ assetPath: string; metadata: z.infer<typeof zAssetMetadataSchema>; }) { - if (!SUPPORTED_ASSET_TYPES.has(metadata.contentType)) { - throw new Error("Unsupported asset type"); - } - const assetDir = getAssetDir(userId, assetId); - await fs.promises.mkdir(assetDir, { recursive: true }); - - await Promise.all([ - // We'll have to copy first then delete the original file as inside the docker container - // we can't move file between mounts. - fs.promises.copyFile(assetPath, path.join(assetDir, "asset.bin")), - fs.promises.writeFile( - path.join(assetDir, "metadata.json"), - JSON.stringify(metadata), - ), - ]); - await fs.promises.rm(assetPath); + return defaultAssetStore.saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata, + }); } export async function readAsset({ @@ -121,20 +651,10 @@ export async function readAsset({ userId: string; assetId: string; }) { - const assetDir = getAssetDir(userId, assetId); - - const [asset, metadataStr] = await Promise.all([ - fs.promises.readFile(path.join(assetDir, "asset.bin")), - fs.promises.readFile(path.join(assetDir, "metadata.json"), { - encoding: "utf8", - }), - ]); - - const metadata = zAssetMetadataSchema.parse(JSON.parse(metadataStr)); - return { asset, metadata }; + return defaultAssetStore.readAsset({ userId, assetId }); } -export function createAssetReadStream({ +export async function createAssetReadStream({ userId, assetId, start, @@ -145,9 +665,9 @@ export function createAssetReadStream({ start?: number; end?: number; }) { - const assetDir = getAssetDir(userId, assetId); - - return fs.createReadStream(path.join(assetDir, "asset.bin"), { + return defaultAssetStore.createAssetReadStream({ + userId, + assetId, start, end, }); @@ -160,16 +680,7 @@ export async function readAssetMetadata({ userId: string; assetId: string; }) { - const assetDir = getAssetDir(userId, assetId); - - const metadataStr = await fs.promises.readFile( - path.join(assetDir, "metadata.json"), - { - encoding: "utf8", - }, - ); - - return zAssetMetadataSchema.parse(JSON.parse(metadataStr)); + return defaultAssetStore.readAssetMetadata({ userId, assetId }); } export async function getAssetSize({ @@ -179,9 +690,7 @@ export async function getAssetSize({ userId: string; assetId: string; }) { - const assetDir = getAssetDir(userId, assetId); - const stat = await fs.promises.stat(path.join(assetDir, "asset.bin")); - return stat.size; + return defaultAssetStore.getAssetSize({ userId, assetId }); } /** @@ -205,42 +714,15 @@ export async function deleteAsset({ userId: string; assetId: string; }) { - const assetDir = getAssetDir(userId, assetId); - await fs.promises.rm(path.join(assetDir), { recursive: true }); + return defaultAssetStore.deleteAsset({ userId, assetId }); } export async function deleteUserAssets({ userId }: { userId: string }) { - const userDir = path.join(ROOT_PATH, userId); - const dirExists = await fs.promises - .access(userDir) - .then(() => true) - .catch(() => false); - if (!dirExists) { - return; - } - await fs.promises.rm(userDir, { recursive: true }); + return defaultAssetStore.deleteUserAssets({ userId }); } export async function* getAllAssets() { - const g = new Glob(`/**/**/asset.bin`, { - maxDepth: 3, - root: ROOT_PATH, - cwd: ROOT_PATH, - absolute: false, - }); - for await (const file of g) { - const [userId, assetId] = file.split("/").slice(0, 2); - const [size, metadata] = await Promise.all([ - getAssetSize({ userId, assetId }), - readAssetMetadata({ userId, assetId }), - ]); - yield { - userId, - assetId, - ...metadata, - size, - }; - } + yield* defaultAssetStore.getAllAssets(); } export async function storeScreenshot( diff --git a/packages/shared/config.ts b/packages/shared/config.ts index b899dbeb..715c2848 100644 --- a/packages/shared/config.ts +++ b/packages/shared/config.ts @@ -88,6 +88,14 @@ const allEnv = z.object({ // A flag to detect if the user is running in the old separete containers setup USING_LEGACY_SEPARATE_CONTAINERS: stringBool("false"), + + // Asset storage configuration + ASSET_STORE_S3_ENDPOINT: z.string().optional(), + ASSET_STORE_S3_REGION: z.string().optional(), + ASSET_STORE_S3_BUCKET: z.string().optional(), + ASSET_STORE_S3_ACCESS_KEY_ID: z.string().optional(), + ASSET_STORE_S3_SECRET_ACCESS_KEY: z.string().optional(), + ASSET_STORE_S3_FORCE_PATH_STYLE: stringBool("false"), }); const serverConfigSchema = allEnv.transform((val) => { @@ -185,6 +193,19 @@ const serverConfigSchema = allEnv.transform((val) => { timeoutSec: val.WEBHOOK_TIMEOUT_SEC, retryTimes: val.WEBHOOK_RETRY_TIMES, }, + assetStore: { + type: val.ASSET_STORE_S3_ENDPOINT + ? ("s3" as const) + : ("filesystem" as const), + s3: { + endpoint: val.ASSET_STORE_S3_ENDPOINT, + region: val.ASSET_STORE_S3_REGION, + bucket: val.ASSET_STORE_S3_BUCKET, + accessKeyId: val.ASSET_STORE_S3_ACCESS_KEY_ID, + secretAccessKey: val.ASSET_STORE_S3_SECRET_ACCESS_KEY, + forcePathStyle: val.ASSET_STORE_S3_FORCE_PATH_STYLE, + }, + }, }; }); diff --git a/packages/shared/package.json b/packages/shared/package.json index 691e1d25..f4e521b6 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -5,6 +5,7 @@ "private": true, "type": "module", "dependencies": { + "@aws-sdk/client-s3": "^3.842.0", "glob": "^11.0.0", "js-tiktoken": "^1.0.20", "liteque": "^0.3.2", |
