CE-881 - Add new StorageAction to backends - e.g., for streaming data into a backend's data store. implemented for in-memory, filesystem, and s3

This commit is contained in:
2024-04-01 12:52:00 -05:00
parent 1eeb57f32f
commit 6fffe3036c
11 changed files with 820 additions and 0 deletions

View File

@ -26,6 +26,7 @@ import java.io.File;
import com.kingsrook.qqq.backend.core.actions.interfaces.CountInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.DeleteInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.InsertInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.UpdateInterface;
import com.kingsrook.qqq.backend.core.logging.QLogger;
@ -39,6 +40,7 @@ import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemCount
import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemDeleteAction;
import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemInsertAction;
import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemQueryAction;
import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemStorageAction;
import com.kingsrook.qqq.backend.module.filesystem.local.actions.FilesystemUpdateAction;
import com.kingsrook.qqq.backend.module.filesystem.local.model.metadata.FilesystemBackendMetaData;
import com.kingsrook.qqq.backend.module.filesystem.local.model.metadata.FilesystemTableBackendDetails;
@ -152,4 +154,14 @@ public class FilesystemBackendModule implements QBackendModuleInterface, Filesys
return (new FilesystemDeleteAction());
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public QStorageInterface getStorageInterface()
{
return (new FilesystemStorageAction());
}
}

View File

@ -0,0 +1,100 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.module.filesystem.local.actions;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput;
import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import org.jetbrains.annotations.NotNull;
/*******************************************************************************
** (mass, streamed) storage action for filesystem module
*******************************************************************************/
public class FilesystemStorageAction extends AbstractFilesystemAction implements QStorageInterface
{
/*******************************************************************************
**
*******************************************************************************/
@Override
public OutputStream createOutputStream(StorageInput storageInput) throws QException
{
try
{
String fullPath = getFullPath(storageInput);
File file = new File(fullPath);
if(!file.getParentFile().mkdirs())
{
throw(new QException("Could not make directory required for storing file: " + fullPath));
}
return (new FileOutputStream(fullPath));
}
catch(IOException e)
{
throw (new QException("IOException creating output stream for file", e));
}
}
/*******************************************************************************
**
*******************************************************************************/
@NotNull
private String getFullPath(StorageInput storageInput)
{
QTableMetaData table = storageInput.getTable();
QBackendMetaData backend = storageInput.getBackend();
String fullPath = stripDuplicatedSlashes(getFullBasePath(table, backend) + File.separator + storageInput.getReference());
return fullPath;
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public InputStream getInputStream(StorageInput storageInput) throws QException
{
try
{
return (new FileInputStream(getFullPath(storageInput)));
}
catch(IOException e)
{
throw (new QException("IOException getting input stream for file", e));
}
}
}

View File

@ -25,6 +25,7 @@ package com.kingsrook.qqq.backend.module.filesystem.s3;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.kingsrook.qqq.backend.core.actions.interfaces.DeleteInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.InsertInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.UpdateInterface;
import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData;
@ -36,6 +37,7 @@ import com.kingsrook.qqq.backend.module.filesystem.s3.actions.AbstractS3Action;
import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3DeleteAction;
import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3InsertAction;
import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3QueryAction;
import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3StorageAction;
import com.kingsrook.qqq.backend.module.filesystem.s3.actions.S3UpdateAction;
import com.kingsrook.qqq.backend.module.filesystem.s3.model.metadata.S3BackendMetaData;
import com.kingsrook.qqq.backend.module.filesystem.s3.model.metadata.S3TableBackendDetails;
@ -136,4 +138,15 @@ public class S3BackendModule implements QBackendModuleInterface, FilesystemBacke
return (new S3DeleteAction());
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public QStorageInterface getStorageInterface()
{
return new S3StorageAction();
}
}

View File

@ -0,0 +1,114 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.module.filesystem.s3.actions;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.kingsrook.qqq.backend.core.actions.interfaces.QStorageInterface;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput;
import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.module.filesystem.s3.model.metadata.S3BackendMetaData;
import com.kingsrook.qqq.backend.module.filesystem.s3.utils.S3UploadOutputStream;
/*******************************************************************************
** (mass, streamed) storage action for filesystem module
*******************************************************************************/
public class S3StorageAction extends AbstractS3Action implements QStorageInterface
{
/*******************************************************************************
**
*******************************************************************************/
@Override
public OutputStream createOutputStream(StorageInput storageInput) throws QException
{
try
{
S3BackendMetaData backend = (S3BackendMetaData) storageInput.getBackend();
AmazonS3 amazonS3 = buildAmazonS3ClientFromBackendMetaData(backend);
String fullPath = getFullPath(storageInput);
S3UploadOutputStream s3UploadOutputStream = new S3UploadOutputStream(amazonS3, backend.getBucketName(), fullPath);
return (s3UploadOutputStream);
}
catch(Exception e)
{
throw (new QException("Exception creating s3 output stream for file", e));
}
}
/*******************************************************************************
**
*******************************************************************************/
private String getFullPath(StorageInput storageInput)
{
QTableMetaData table = storageInput.getTable();
QBackendMetaData backend = storageInput.getBackend();
String fullPath = stripDuplicatedSlashes(getFullBasePath(table, backend) + File.separator + storageInput.getReference());
/////////////////////////////////////////////////////////////
// s3 seems to do better w/o leading slashes, so, strip... //
/////////////////////////////////////////////////////////////
if(fullPath.startsWith("/"))
{
fullPath = fullPath.substring(1);
}
return fullPath;
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public InputStream getInputStream(StorageInput storageInput) throws QException
{
try
{
S3BackendMetaData backend = (S3BackendMetaData) storageInput.getBackend();
AmazonS3 amazonS3 = buildAmazonS3ClientFromBackendMetaData(backend);
String fullPath = getFullPath(storageInput);
GetObjectRequest getObjectRequest = new GetObjectRequest(backend.getBucketName(), fullPath);
S3Object s3Object = amazonS3.getObject(getObjectRequest);
S3ObjectInputStream objectContent = s3Object.getObjectContent();
return (objectContent);
}
catch(Exception e)
{
throw (new QException("Exception getting s3 input stream for file", e));
}
}
}

View File

@ -0,0 +1,188 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.module.filesystem.s3.utils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
/*******************************************************************************
** OutputStream implementation that knows how to stream data into a new S3 file.
**
** This will be done using a multipart-upload if the contents are > 5MB.
*******************************************************************************/
public class S3UploadOutputStream extends OutputStream
{
private final AmazonS3 amazonS3;
private final String bucketName;
private final String key;
private byte[] buffer = new byte[5 * 1024 * 1024];
private int offset = 0;
private InitiateMultipartUploadResult initiateMultipartUploadResult = null;
private List<UploadPartResult> uploadPartResultList = null;
/*******************************************************************************
** Constructor
**
*******************************************************************************/
public S3UploadOutputStream(AmazonS3 amazonS3, String bucketName, String key)
{
this.amazonS3 = amazonS3;
this.bucketName = bucketName;
this.key = key;
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void write(int b) throws IOException
{
buffer[offset] = (byte) b;
offset++;
uploadIfNeeded();
}
/*******************************************************************************
**
*******************************************************************************/
private void uploadIfNeeded()
{
if(offset == buffer.length)
{
//////////////////////////////////////////
// start or continue a multipart upload //
//////////////////////////////////////////
if(initiateMultipartUploadResult == null)
{
initiateMultipartUploadResult = amazonS3.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, key));
uploadPartResultList = new ArrayList<>();
}
UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withUploadId(initiateMultipartUploadResult.getUploadId())
.withPartNumber(uploadPartResultList.size() + 1)
.withInputStream(new ByteArrayInputStream(buffer))
.withBucketName(bucketName)
.withKey(key)
.withPartSize(buffer.length);
uploadPartResultList.add(amazonS3.uploadPart(uploadPartRequest));
//////////////////
// reset buffer //
//////////////////
offset = 0;
}
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void write(byte[] b, int off, int len) throws IOException
{
int bytesToWrite = len;
while(bytesToWrite > buffer.length - offset)
{
int size = buffer.length - offset;
// System.out.println("A:copy " + size + " bytes from source[" + off + "] to dest[" + offset + "]");
System.arraycopy(b, off, buffer, offset, size);
offset = buffer.length;
uploadIfNeeded();
off += size;
bytesToWrite -= size;
}
int size = len - off;
// System.out.println("B:copy " + size + " bytes from source[" + off + "] to dest[" + offset + "]");
System.arraycopy(b, off, buffer, offset, size);
offset += size;
uploadIfNeeded();
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void close() throws IOException
{
if(initiateMultipartUploadResult != null)
{
if (offset > 0)
{
//////////////////////////////////////////////////
// if there's a final part to upload, do it now //
//////////////////////////////////////////////////
UploadPartRequest uploadPartRequest = new UploadPartRequest()
.withUploadId(initiateMultipartUploadResult.getUploadId())
.withPartNumber(uploadPartResultList.size() + 1)
.withInputStream(new ByteArrayInputStream(buffer, 0, offset))
.withBucketName(bucketName)
.withKey(key)
.withPartSize(offset);
uploadPartResultList.add(amazonS3.uploadPart(uploadPartRequest));
}
CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest()
.withUploadId(initiateMultipartUploadResult.getUploadId())
.withPartETags(uploadPartResultList)
.withBucketName(bucketName)
.withKey(key);
CompleteMultipartUploadResult completeMultipartUploadResult = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);
}
else
{
ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentLength(offset);
PutObjectResult putObjectResult = amazonS3.putObject(bucketName, key, new ByteArrayInputStream(buffer, 0, offset), objectMetadata);
}
}
}