From 6fffe3036c8fe983fcb2a1754deb22302f9b5020 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Mon, 1 Apr 2024 12:52:00 -0500 Subject: [PATCH] CE-881 - Add new StorageAction to backends - e.g., for streaming data into a backend's data store. implemented for in-memory, filesystem, and s3 --- .../actions/interfaces/QStorageInterface.java | 49 +++++ .../core/actions/tables/StorageAction.java | 96 +++++++++ .../actions/tables/storage/StorageInput.java | 77 +++++++ .../backend/QBackendModuleInterface.java | 11 + .../memory/MemoryBackendModule.java | 11 + .../memory/MemoryStorageAction.java | 149 ++++++++++++++ .../local/FilesystemBackendModule.java | 12 ++ .../actions/FilesystemStorageAction.java | 100 ++++++++++ .../module/filesystem/s3/S3BackendModule.java | 13 ++ .../s3/actions/S3StorageAction.java | 114 +++++++++++ .../s3/utils/S3UploadOutputStream.java | 188 ++++++++++++++++++ 11 files changed, 820 insertions(+) create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/QStorageInterface.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/tables/StorageAction.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/storage/StorageInput.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/implementations/memory/MemoryStorageAction.java create mode 100644 qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/local/actions/FilesystemStorageAction.java create mode 100644 qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/actions/S3StorageAction.java create mode 100644 qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/utils/S3UploadOutputStream.java diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/QStorageInterface.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/QStorageInterface.java new file mode 100644 index 00000000..1181494b --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/QStorageInterface.java @@ -0,0 +1,49 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.actions.interfaces; + + +import java.io.InputStream; +import java.io.OutputStream; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput; + + +/******************************************************************************* + ** Interface for actions that a backend can perform, based on streaming data + ** into the backend's storage. + *******************************************************************************/ +public interface QStorageInterface +{ + + /******************************************************************************* + ** + *******************************************************************************/ + OutputStream createOutputStream(StorageInput storageInput) throws QException; + + + /******************************************************************************* + ** + *******************************************************************************/ + InputStream getInputStream(StorageInput storageInput) throws QException; + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/tables/StorageAction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/tables/StorageAction.java new file mode 100644 index 00000000..f877dddb --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/tables/StorageAction.java @@ -0,0 +1,96 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.actions.tables; + + +import java.io.InputStream; +import java.io.OutputStream; +import com.kingsrook.qqq.backend.core.actions.ActionHelper; +import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput; +import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData; +import com.kingsrook.qqq.backend.core.modules.backend.QBackendModuleDispatcher; +import com.kingsrook.qqq.backend.core.modules.backend.QBackendModuleInterface; + + +/******************************************************************************* + ** Action to do (generally, "mass") storage operations in a backend. + ** + ** e.g., store a (potentially large) file - specifically - by working with it + ** as either an InputStream or OutputStream. + ** + ** May not be implemented in all backends. + ** + *******************************************************************************/ +public class StorageAction +{ + + /******************************************************************************* + ** + *******************************************************************************/ + public OutputStream createOutputStream(StorageInput storageInput) throws QException + { + QBackendModuleInterface qBackendModuleInterface = preAction(storageInput); + QStorageInterface storageInterface = qBackendModuleInterface.getStorageInterface(); + return (storageInterface.createOutputStream(storageInput)); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public InputStream getInputStream(StorageInput storageInput) throws QException + { + QBackendModuleInterface qBackendModuleInterface = preAction(storageInput); + QStorageInterface storageInterface = qBackendModuleInterface.getStorageInterface(); + return (storageInterface.getInputStream(storageInput)); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private QBackendModuleInterface preAction(StorageInput storageInput) throws QException + { + ActionHelper.validateSession(storageInput); + + if(storageInput.getTableName() == null) + { + throw (new QException("Table name was not specified in query input")); + } + + QTableMetaData table = storageInput.getTable(); + if(table == null) + { + throw (new QException("A table named [" + storageInput.getTableName() + "] was not found in the active QInstance")); + } + + QBackendMetaData backend = storageInput.getBackend(); + QBackendModuleDispatcher qBackendModuleDispatcher = new QBackendModuleDispatcher(); + QBackendModuleInterface qModule = qBackendModuleDispatcher.getQBackendModule(backend); + return (qModule); + } +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/storage/StorageInput.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/storage/StorageInput.java new file mode 100644 index 00000000..5bd66c45 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/storage/StorageInput.java @@ -0,0 +1,77 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.model.actions.tables.storage; + + +import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput; + + +/******************************************************************************* + ** + *******************************************************************************/ +public class StorageInput extends AbstractTableActionInput +{ + private String reference; + + + + /******************************************************************************* + ** + *******************************************************************************/ + public StorageInput(String storageTableName) + { + super(); + setTableName(storageTableName); + } + + + + /******************************************************************************* + ** Getter for reference + *******************************************************************************/ + public String getReference() + { + return (this.reference); + } + + + + /******************************************************************************* + ** Setter for reference + *******************************************************************************/ + public void setReference(String reference) + { + this.reference = reference; + } + + + + /******************************************************************************* + ** Fluent setter for reference + *******************************************************************************/ + public StorageInput withReference(String reference) + { + this.reference = reference; + return (this); + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/QBackendModuleInterface.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/QBackendModuleInterface.java index 64ce0c3c..aab77cb5 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/QBackendModuleInterface.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/QBackendModuleInterface.java @@ -28,6 +28,7 @@ import com.kingsrook.qqq.backend.core.actions.interfaces.CountInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.DeleteInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.GetInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.InsertInterface; +import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.UpdateInterface; import com.kingsrook.qqq.backend.core.exceptions.QException; @@ -129,6 +130,16 @@ public interface QBackendModuleInterface return null; } + + /******************************************************************************* + ** + *******************************************************************************/ + default QStorageInterface getStorageInterface() + { + throwNotImplemented("StorageInterface"); + return null; + } + /******************************************************************************* ** *******************************************************************************/ diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/implementations/memory/MemoryBackendModule.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/implementations/memory/MemoryBackendModule.java index 4d6a93cb..113b30de 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/implementations/memory/MemoryBackendModule.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/implementations/memory/MemoryBackendModule.java @@ -26,6 +26,7 @@ import com.kingsrook.qqq.backend.core.actions.interfaces.AggregateInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.CountInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.DeleteInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.InsertInterface; +import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.UpdateInterface; import com.kingsrook.qqq.backend.core.modules.backend.QBackendModuleInterface; @@ -117,4 +118,14 @@ public class MemoryBackendModule implements QBackendModuleInterface return (new MemoryDeleteAction()); } + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public QStorageInterface getStorageInterface() + { + return (new MemoryStorageAction()); + } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/implementations/memory/MemoryStorageAction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/implementations/memory/MemoryStorageAction.java new file mode 100644 index 00000000..313145a7 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/modules/backend/implementations/memory/MemoryStorageAction.java @@ -0,0 +1,149 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.modules.backend.implementations.memory; + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Optional; +import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface; +import com.kingsrook.qqq.backend.core.actions.tables.GetAction; +import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; +import com.kingsrook.qqq.backend.core.context.QContext; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.exceptions.QNotFoundException; +import com.kingsrook.qqq.backend.core.model.actions.tables.get.GetInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertOutput; +import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput; +import com.kingsrook.qqq.backend.core.model.data.QRecord; +import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType; +import com.kingsrook.qqq.backend.core.utils.CollectionUtils; +import com.kingsrook.qqq.backend.core.utils.StringUtils; + + +/******************************************************************************* + ** implementation of bulk-storage interface, for the memory backend module. + ** + ** Requires table to have (at least?) 2 fields - a STRING primary key and a + ** BLOB to store bytes. + *******************************************************************************/ +public class MemoryStorageAction implements QStorageInterface +{ + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public OutputStream createOutputStream(StorageInput storageInput) + { + return new MemoryStorageOutputStream(storageInput.getTableName(), storageInput.getReference()); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public InputStream getInputStream(StorageInput storageInput) throws QException + { + QRecord record = new GetAction().executeForRecord(new GetInput(storageInput.getTableName()).withPrimaryKey(storageInput.getReference())); + if(record == null) + { + throw (new QNotFoundException("Could not find input stream for [" + storageInput.getTableName() + "][" + storageInput.getReference() + "]")); + } + + QFieldMetaData blobField = getBlobField(storageInput.getTableName()); + return (new ByteArrayInputStream(record.getValueByteArray(blobField.getName()))); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private static QFieldMetaData getBlobField(String tableName) throws QException + { + Optional firstBlobField = QContext.getQInstance().getTable(tableName).getFields().values().stream().filter(f -> QFieldType.BLOB.equals(f.getType())).findFirst(); + if(firstBlobField.isEmpty()) + { + throw (new QException("Could not find a blob field in table [" + tableName + "]")); + } + return firstBlobField.get(); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private static class MemoryStorageOutputStream extends ByteArrayOutputStream + { + private final String tableName; + private final String reference; + + + + /******************************************************************************* + ** Constructor + ** + *******************************************************************************/ + public MemoryStorageOutputStream(String tableName, String reference) + { + this.tableName = tableName; + this.reference = reference; + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public void close() throws IOException + { + super.close(); + + try + { + QFieldMetaData blobField = getBlobField(tableName); + InsertOutput insertOutput = new InsertAction().execute(new InsertInput(tableName).withRecord(new QRecord() + .withValue(QContext.getQInstance().getTable(tableName).getPrimaryKeyField(), reference) + .withValue(blobField.getName(), toByteArray()))); + + if(CollectionUtils.nullSafeHasContents(insertOutput.getRecords().get(0).getErrors())) + { + throw(new IOException("Error storing stream into memory storage: " + StringUtils.joinWithCommasAndAnd(insertOutput.getRecords().get(0).getErrors().stream().map(e -> e.getMessage()).toList()))); + } + } + catch(Exception e) + { + throw new IOException("Wrapped QException", e); + } + } + } +} diff --git a/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/local/FilesystemBackendModule.java b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/local/FilesystemBackendModule.java index 820915ff..67cfc9ae 100644 --- a/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/local/FilesystemBackendModule.java +++ b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/local/FilesystemBackendModule.java @@ -26,6 +26,7 @@ import java.io.File; import com.kingsrook.qqq.backend.core.actions.interfaces.CountInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.DeleteInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.InsertInterface; +import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.UpdateInterface; import com.kingsrook.qqq.backend.core.logging.QLogger; @@ -39,6 +40,7 @@ import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemCount import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemDeleteAction; import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemInsertAction; import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemQueryAction; +import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemStorageAction; import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemUpdateAction; import com.kingsrook.qqq.backend.module.filesystem.local.model.metadata.FilesystemBackendMetaData; import com.kingsrook.qqq.backend.module.filesystem.local.model.metadata.FilesystemTableBackendDetails; @@ -152,4 +154,14 @@ public class FilesystemBackendModule implements QBackendModuleInterface, Filesys return (new FilesystemDeleteAction()); } + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public QStorageInterface getStorageInterface() + { + return (new FilesystemStorageAction()); + } } diff --git a/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/local/actions/FilesystemStorageAction.java b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/local/actions/FilesystemStorageAction.java new file mode 100644 index 00000000..01978ec9 --- /dev/null +++ b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/local/actions/FilesystemStorageAction.java @@ -0,0 +1,100 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.module.filesystem.local.actions; + + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput; +import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData; +import org.jetbrains.annotations.NotNull; + + +/******************************************************************************* + ** (mass, streamed) storage action for filesystem module + *******************************************************************************/ +public class FilesystemStorageAction extends AbstractFilesystemAction implements QStorageInterface +{ + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public OutputStream createOutputStream(StorageInput storageInput) throws QException + { + try + { + String fullPath = getFullPath(storageInput); + File file = new File(fullPath); + if(!file.getParentFile().mkdirs()) + { + throw(new QException("Could not make directory required for storing file: " + fullPath)); + } + + return (new FileOutputStream(fullPath)); + } + catch(IOException e) + { + throw (new QException("IOException creating output stream for file", e)); + } + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @NotNull + private String getFullPath(StorageInput storageInput) + { + QTableMetaData table = storageInput.getTable(); + QBackendMetaData backend = storageInput.getBackend(); + String fullPath = stripDuplicatedSlashes(getFullBasePath(table, backend) + File.separator + storageInput.getReference()); + return fullPath; + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public InputStream getInputStream(StorageInput storageInput) throws QException + { + try + { + return (new FileInputStream(getFullPath(storageInput))); + } + catch(IOException e) + { + throw (new QException("IOException getting input stream for file", e)); + } + } + +} diff --git a/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/S3BackendModule.java b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/S3BackendModule.java index d613dced..0d8872b6 100644 --- a/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/S3BackendModule.java +++ b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/S3BackendModule.java @@ -25,6 +25,7 @@ package com.kingsrook.qqq.backend.module.filesystem.s3; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.kingsrook.qqq.backend.core.actions.interfaces.DeleteInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.InsertInterface; +import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface; import com.kingsrook.qqq.backend.core.actions.interfaces.UpdateInterface; import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData; @@ -36,6 +37,7 @@ import com.kingsrook.qqq.backend.module.filesystem.s3.actions.AbstractS3Action; import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3DeleteAction; import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3InsertAction; import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3QueryAction; +import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3StorageAction; import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3UpdateAction; import com.kingsrook.qqq.backend.module.filesystem.s3.model.metadata.S3BackendMetaData; import com.kingsrook.qqq.backend.module.filesystem.s3.model.metadata.S3TableBackendDetails; @@ -136,4 +138,15 @@ public class S3BackendModule implements QBackendModuleInterface, FilesystemBacke return (new S3DeleteAction()); } + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public QStorageInterface getStorageInterface() + { + return new S3StorageAction(); + } + } diff --git a/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/actions/S3StorageAction.java b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/actions/S3StorageAction.java new file mode 100644 index 00000000..7d924053 --- /dev/null +++ b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/actions/S3StorageAction.java @@ -0,0 +1,114 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.module.filesystem.s3.actions; + + +import java.io.File; +import java.io.InputStream; +import java.io.OutputStream; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput; +import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData; +import com.kingsrook.qqq.backend.module.filesystem.s3.model.metadata.S3BackendMetaData; +import com.kingsrook.qqq.backend.module.filesystem.s3.utils.S3UploadOutputStream; + + +/******************************************************************************* + ** (mass, streamed) storage action for filesystem module + *******************************************************************************/ +public class S3StorageAction extends AbstractS3Action implements QStorageInterface +{ + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public OutputStream createOutputStream(StorageInput storageInput) throws QException + { + try + { + S3BackendMetaData backend = (S3BackendMetaData) storageInput.getBackend(); + AmazonS3 amazonS3 = buildAmazonS3ClientFromBackendMetaData(backend); + String fullPath = getFullPath(storageInput); + S3UploadOutputStream s3UploadOutputStream = new S3UploadOutputStream(amazonS3, backend.getBucketName(), fullPath); + return (s3UploadOutputStream); + } + catch(Exception e) + { + throw (new QException("Exception creating s3 output stream for file", e)); + } + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private String getFullPath(StorageInput storageInput) + { + QTableMetaData table = storageInput.getTable(); + QBackendMetaData backend = storageInput.getBackend(); + String fullPath = stripDuplicatedSlashes(getFullBasePath(table, backend) + File.separator + storageInput.getReference()); + + ///////////////////////////////////////////////////////////// + // s3 seems to do better w/o leading slashes, so, strip... // + ///////////////////////////////////////////////////////////// + if(fullPath.startsWith("/")) + { + fullPath = fullPath.substring(1); + } + + return fullPath; + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public InputStream getInputStream(StorageInput storageInput) throws QException + { + try + { + S3BackendMetaData backend = (S3BackendMetaData) storageInput.getBackend(); + AmazonS3 amazonS3 = buildAmazonS3ClientFromBackendMetaData(backend); + String fullPath = getFullPath(storageInput); + GetObjectRequest getObjectRequest = new GetObjectRequest(backend.getBucketName(), fullPath); + S3Object s3Object = amazonS3.getObject(getObjectRequest); + S3ObjectInputStream objectContent = s3Object.getObjectContent(); + + return (objectContent); + } + catch(Exception e) + { + throw (new QException("Exception getting s3 input stream for file", e)); + } + } + +} diff --git a/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/utils/S3UploadOutputStream.java b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/utils/S3UploadOutputStream.java new file mode 100644 index 00000000..2a3cd416 --- /dev/null +++ b/qqq-backend-module-filesystem/src/main/java/com/kingsrook/qqq/backend/module/filesystem/s3/utils/S3UploadOutputStream.java @@ -0,0 +1,188 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.module.filesystem.s3.utils; + + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; + + +/******************************************************************************* + ** OutputStream implementation that knows how to stream data into a new S3 file. + ** + ** This will be done using a multipart-upload if the contents are > 5MB. + *******************************************************************************/ +public class S3UploadOutputStream extends OutputStream +{ + private final AmazonS3 amazonS3; + private final String bucketName; + private final String key; + + private byte[] buffer = new byte[5 * 1024 * 1024]; + private int offset = 0; + + private InitiateMultipartUploadResult initiateMultipartUploadResult = null; + private List uploadPartResultList = null; + + + + /******************************************************************************* + ** Constructor + ** + *******************************************************************************/ + public S3UploadOutputStream(AmazonS3 amazonS3, String bucketName, String key) + { + this.amazonS3 = amazonS3; + this.bucketName = bucketName; + this.key = key; + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public void write(int b) throws IOException + { + buffer[offset] = (byte) b; + offset++; + + uploadIfNeeded(); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private void uploadIfNeeded() + { + if(offset == buffer.length) + { + ////////////////////////////////////////// + // start or continue a multipart upload // + ////////////////////////////////////////// + if(initiateMultipartUploadResult == null) + { + initiateMultipartUploadResult = amazonS3.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, key)); + uploadPartResultList = new ArrayList<>(); + } + + UploadPartRequest uploadPartRequest = new UploadPartRequest() + .withUploadId(initiateMultipartUploadResult.getUploadId()) + .withPartNumber(uploadPartResultList.size() + 1) + .withInputStream(new ByteArrayInputStream(buffer)) + .withBucketName(bucketName) + .withKey(key) + .withPartSize(buffer.length); + + uploadPartResultList.add(amazonS3.uploadPart(uploadPartRequest)); + + ////////////////// + // reset buffer // + ////////////////// + offset = 0; + } + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public void write(byte[] b, int off, int len) throws IOException + { + int bytesToWrite = len; + + while(bytesToWrite > buffer.length - offset) + { + int size = buffer.length - offset; + // System.out.println("A:copy " + size + " bytes from source[" + off + "] to dest[" + offset + "]"); + System.arraycopy(b, off, buffer, offset, size); + offset = buffer.length; + uploadIfNeeded(); + off += size; + bytesToWrite -= size; + } + + int size = len - off; + // System.out.println("B:copy " + size + " bytes from source[" + off + "] to dest[" + offset + "]"); + System.arraycopy(b, off, buffer, offset, size); + offset += size; + uploadIfNeeded(); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public void close() throws IOException + { + if(initiateMultipartUploadResult != null) + { + if (offset > 0) + { + ////////////////////////////////////////////////// + // if there's a final part to upload, do it now // + ////////////////////////////////////////////////// + UploadPartRequest uploadPartRequest = new UploadPartRequest() + .withUploadId(initiateMultipartUploadResult.getUploadId()) + .withPartNumber(uploadPartResultList.size() + 1) + .withInputStream(new ByteArrayInputStream(buffer, 0, offset)) + .withBucketName(bucketName) + .withKey(key) + .withPartSize(offset); + uploadPartResultList.add(amazonS3.uploadPart(uploadPartRequest)); + } + + CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest() + .withUploadId(initiateMultipartUploadResult.getUploadId()) + .withPartETags(uploadPartResultList) + .withBucketName(bucketName) + .withKey(key); + CompleteMultipartUploadResult completeMultipartUploadResult = amazonS3.completeMultipartUpload(completeMultipartUploadRequest); + } + else + { + ObjectMetadata objectMetadata = new ObjectMetadata(); + objectMetadata.setContentLength(offset); + PutObjectResult putObjectResult = amazonS3.putObject(bucketName, key, new ByteArrayInputStream(buffer, 0, offset), objectMetadata); + } + } + +}