mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-17 20:50:44 +00:00
CE-881 - Cleanups - string aggregates; json field names; excel sheet name cleansing; excel size limits; counts, etc
This commit is contained in:
@ -36,6 +36,8 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
|
||||
import com.amazonaws.services.s3.model.PutObjectResult;
|
||||
import com.amazonaws.services.s3.model.UploadPartRequest;
|
||||
import com.amazonaws.services.s3.model.UploadPartResult;
|
||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -45,16 +47,20 @@ import com.amazonaws.services.s3.model.UploadPartResult;
|
||||
*******************************************************************************/
|
||||
public class S3UploadOutputStream extends OutputStream
|
||||
{
|
||||
private static final QLogger LOG = QLogger.getLogger(S3UploadOutputStream.class);
|
||||
|
||||
private final AmazonS3 amazonS3;
|
||||
private final String bucketName;
|
||||
private final String key;
|
||||
|
||||
private byte[] buffer = new byte[5 * 1024 * 1024];
|
||||
private int offset = 0;
|
||||
private byte[] buffer = new byte[5 * 1024 * 1024];
|
||||
private int offset = 0;
|
||||
|
||||
private InitiateMultipartUploadResult initiateMultipartUploadResult = null;
|
||||
private List<UploadPartResult> uploadPartResultList = null;
|
||||
|
||||
private boolean isClosed = false;
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -96,10 +102,12 @@ public class S3UploadOutputStream extends OutputStream
|
||||
//////////////////////////////////////////
|
||||
if(initiateMultipartUploadResult == null)
|
||||
{
|
||||
LOG.info("Initiating a multipart upload", logPair("key", key));
|
||||
initiateMultipartUploadResult = amazonS3.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, key));
|
||||
uploadPartResultList = new ArrayList<>();
|
||||
}
|
||||
|
||||
LOG.info("Uploading a part", logPair("key", key), logPair("partNumber", uploadPartResultList.size() + 1));
|
||||
UploadPartRequest uploadPartRequest = new UploadPartRequest()
|
||||
.withUploadId(initiateMultipartUploadResult.getUploadId())
|
||||
.withPartNumber(uploadPartResultList.size() + 1)
|
||||
@ -130,7 +138,6 @@ public class S3UploadOutputStream extends OutputStream
|
||||
while(bytesToWrite > buffer.length - offset)
|
||||
{
|
||||
int size = buffer.length - offset;
|
||||
// System.out.println("A:copy " + size + " bytes from source[" + off + "] to dest[" + offset + "]");
|
||||
System.arraycopy(b, off, buffer, offset, size);
|
||||
offset = buffer.length;
|
||||
uploadIfNeeded();
|
||||
@ -139,7 +146,6 @@ public class S3UploadOutputStream extends OutputStream
|
||||
}
|
||||
|
||||
int size = len - off;
|
||||
// System.out.println("B:copy " + size + " bytes from source[" + off + "] to dest[" + offset + "]");
|
||||
System.arraycopy(b, off, buffer, offset, size);
|
||||
offset += size;
|
||||
uploadIfNeeded();
|
||||
@ -153,13 +159,20 @@ public class S3UploadOutputStream extends OutputStream
|
||||
@Override
|
||||
public void close() throws IOException
|
||||
{
|
||||
if(isClosed)
|
||||
{
|
||||
LOG.debug("Redundant call to close an already-closed S3UploadOutputStream. Returning with noop.", logPair("key", key));
|
||||
return;
|
||||
}
|
||||
|
||||
if(initiateMultipartUploadResult != null)
|
||||
{
|
||||
if (offset > 0)
|
||||
if(offset > 0)
|
||||
{
|
||||
//////////////////////////////////////////////////
|
||||
// if there's a final part to upload, do it now //
|
||||
//////////////////////////////////////////////////
|
||||
LOG.info("Uploading a part", logPair("key", key), logPair("isFinalPart", true), logPair("partNumber", uploadPartResultList.size() + 1));
|
||||
UploadPartRequest uploadPartRequest = new UploadPartRequest()
|
||||
.withUploadId(initiateMultipartUploadResult.getUploadId())
|
||||
.withPartNumber(uploadPartResultList.size() + 1)
|
||||
@ -179,10 +192,13 @@ public class S3UploadOutputStream extends OutputStream
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.info("Putting object (non-multipart)", logPair("key", key), logPair("length", offset));
|
||||
ObjectMetadata objectMetadata = new ObjectMetadata();
|
||||
objectMetadata.setContentLength(offset);
|
||||
PutObjectResult putObjectResult = amazonS3.putObject(bucketName, key, new ByteArrayInputStream(buffer, 0, offset), objectMetadata);
|
||||
}
|
||||
|
||||
isClosed = true;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -63,16 +63,16 @@ public class TestUtils
|
||||
public static final String BACKEND_NAME_S3 = "s3";
|
||||
public static final String BACKEND_NAME_S3_SANS_PREFIX = "s3sansPrefix";
|
||||
public static final String BACKEND_NAME_MOCK = "mock";
|
||||
public static final String BACKEND_NAME_MEMORY = "memory";
|
||||
public static final String BACKEND_NAME_MEMORY = "memory";
|
||||
|
||||
public static final String TABLE_NAME_PERSON_LOCAL_FS_JSON = "person-local-json";
|
||||
public static final String TABLE_NAME_PERSON_LOCAL_FS_CSV = "person-local-csv";
|
||||
public static final String TABLE_NAME_BLOB_LOCAL_FS = "local-blob";
|
||||
public static final String TABLE_NAME_ARCHIVE_LOCAL_FS = "local-archive";
|
||||
public static final String TABLE_NAME_ARCHIVE_LOCAL_FS = "local-archive";
|
||||
public static final String TABLE_NAME_PERSON_S3 = "person-s3";
|
||||
public static final String TABLE_NAME_BLOB_S3 = "s3-blob";
|
||||
public static final String TABLE_NAME_PERSON_MOCK = "person-mock";
|
||||
public static final String TABLE_NAME_BLOB_S3_SANS_PREFIX = "s3-blob-sans-prefix";
|
||||
public static final String TABLE_NAME_BLOB_S3_SANS_PREFIX = "s3-blob-sans-prefix";
|
||||
|
||||
public static final String PROCESS_NAME_STREAMED_ETL = "etl.streamed";
|
||||
public static final String LOCAL_PERSON_CSV_FILE_IMPORTER_PROCESS_NAME = "localPersonCsvFileImporter";
|
||||
|
Reference in New Issue
Block a user