mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-20 22:18:43 +00:00
CE-1955 - Boosting test-coverage during bulk-load rollout
This commit is contained in:
@ -73,13 +73,13 @@ import com.kingsrook.qqq.backend.core.processes.implementations.bulk.delete.Bulk
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.delete.BulkDeleteTransformStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.edit.BulkEditLoadStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.edit.BulkEditTransformStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.BulkInsertExtractStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.BulkInsertLoadStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.BulkInsertPrepareFileMappingStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.BulkInsertPrepareValueMappingStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.BulkInsertReceiveFileMappingStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.BulkInsertReceiveValueMappingStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.BulkInsertTransformStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.BulkInsertV2ExtractStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ExtractViaQueryStep;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.StreamedETLWithFrontendProcess;
|
||||
import com.kingsrook.qqq.backend.core.scheduler.QScheduleManager;
|
||||
@ -819,7 +819,7 @@ public class QInstanceEnricher
|
||||
values.put(StreamedETLWithFrontendProcess.FIELD_DESTINATION_TABLE, table.getName());
|
||||
|
||||
QProcessMetaData process = StreamedETLWithFrontendProcess.defineProcessMetaData(
|
||||
BulkInsertV2ExtractStep.class,
|
||||
BulkInsertExtractStep.class,
|
||||
BulkInsertTransformStep.class,
|
||||
BulkInsertLoadStep.class,
|
||||
values
|
||||
|
@ -22,84 +22,106 @@
|
||||
package com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert;
|
||||
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import com.kingsrook.qqq.backend.core.adapters.CsvToQRecordAdapter;
|
||||
import com.kingsrook.qqq.backend.core.context.QContext;
|
||||
import java.util.Objects;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.StorageAction;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QUserFacingException;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.QUploadedFile;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.shared.mapping.QKeyBasedFieldMapping;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.filehandling.FileToRowsInterface;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.mapping.RowsToRecordInterface;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.model.BulkInsertMapping;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.model.BulkLoadFileRow;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractExtractStep;
|
||||
import com.kingsrook.qqq.backend.core.state.AbstractStateKey;
|
||||
import com.kingsrook.qqq.backend.core.state.TempFileStateProvider;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Extract step for generic table bulk-insert ETL process
|
||||
**
|
||||
** This step does a little bit of transforming, actually - taking rows from
|
||||
** an uploaded file, and potentially merging them (for child-table use-cases)
|
||||
** and applying the "Mapping" - to put fully built records into the pipe for the
|
||||
** Transform step.
|
||||
*******************************************************************************/
|
||||
public class BulkInsertExtractStep extends AbstractExtractStep
|
||||
{
|
||||
|
||||
/***************************************************************************
|
||||
**
|
||||
***************************************************************************/
|
||||
@Override
|
||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
AbstractStateKey stateKey = (AbstractStateKey) runBackendStepInput.getValue(QUploadedFile.DEFAULT_UPLOADED_FILE_FIELD_NAME);
|
||||
Optional<QUploadedFile> optionalUploadedFile = TempFileStateProvider.getInstance().get(QUploadedFile.class, stateKey);
|
||||
if(optionalUploadedFile.isEmpty())
|
||||
int rowsAdded = 0;
|
||||
int originalLimit = Objects.requireNonNullElse(getLimit(), Integer.MAX_VALUE);
|
||||
|
||||
StorageInput storageInput = BulkInsertStepUtils.getStorageInputForTheFile(runBackendStepInput);
|
||||
BulkInsertMapping bulkInsertMapping = (BulkInsertMapping) runBackendStepOutput.getValue("bulkInsertMapping");
|
||||
RowsToRecordInterface rowsToRecord = bulkInsertMapping.getLayout().newRowsToRecordInterface();
|
||||
|
||||
try
|
||||
(
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// open a stream to read from our file, and a FileToRows object, that knows how to read from that stream //
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
InputStream inputStream = new StorageAction().getInputStream(storageInput);
|
||||
FileToRowsInterface fileToRowsInterface = FileToRowsInterface.forFile(storageInput.getReference(), inputStream);
|
||||
)
|
||||
{
|
||||
throw (new QException("Could not find uploaded file"));
|
||||
}
|
||||
///////////////////////////////////////////////////////////
|
||||
// read the header row (if this file & mapping uses one) //
|
||||
///////////////////////////////////////////////////////////
|
||||
BulkLoadFileRow headerRow = bulkInsertMapping.getHasHeaderRow() ? fileToRowsInterface.next() : null;
|
||||
|
||||
byte[] bytes = optionalUploadedFile.get().getBytes();
|
||||
String fileName = optionalUploadedFile.get().getFilename();
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// while there are more rows in the file - and we're under the limit - get more records form the file //
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
while(fileToRowsInterface.hasNext() && rowsAdded < originalLimit)
|
||||
{
|
||||
int remainingLimit = originalLimit - rowsAdded;
|
||||
|
||||
/////////////////////////////////////////////////////
|
||||
// let the user specify field labels instead names //
|
||||
/////////////////////////////////////////////////////
|
||||
QTableMetaData table = runBackendStepInput.getTable();
|
||||
String tableName = runBackendStepInput.getTableName();
|
||||
QKeyBasedFieldMapping mapping = new QKeyBasedFieldMapping();
|
||||
for(Map.Entry<String, QFieldMetaData> entry : table.getFields().entrySet())
|
||||
{
|
||||
mapping.addMapping(entry.getKey(), entry.getValue().getLabel());
|
||||
}
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// put a page-size limit on the rows-to-record class, so it won't be tempted to do whole file all at once //
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
int pageLimit = Math.min(remainingLimit, getMaxPageSize());
|
||||
List<QRecord> page = rowsToRecord.nextPage(fileToRowsInterface, headerRow, bulkInsertMapping, pageLimit);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
// get the non-editable fields - they'll be blanked out in a customizer //
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
List<QFieldMetaData> nonEditableFields = table.getFields().values().stream()
|
||||
.filter(f -> !f.getIsEditable())
|
||||
.toList();
|
||||
|
||||
if(fileName.toLowerCase(Locale.ROOT).endsWith(".csv"))
|
||||
{
|
||||
new CsvToQRecordAdapter().buildRecordsFromCsv(new CsvToQRecordAdapter.InputWrapper()
|
||||
.withRecordPipe(getRecordPipe())
|
||||
.withLimit(getLimit())
|
||||
.withCsv(new String(bytes))
|
||||
.withDoCorrectValueTypes(true)
|
||||
.withTable(QContext.getQInstance().getTable(tableName))
|
||||
.withMapping(mapping)
|
||||
.withRecordCustomizer((record) ->
|
||||
if(page.size() > remainingLimit)
|
||||
{
|
||||
////////////////////////////////////////////
|
||||
// remove values from non-editable fields //
|
||||
////////////////////////////////////////////
|
||||
for(QFieldMetaData nonEditableField : nonEditableFields)
|
||||
{
|
||||
record.setValue(nonEditableField.getName(), null);
|
||||
}
|
||||
}));
|
||||
/////////////////////////////////////////////////////////////
|
||||
// in case we got back more than we asked for, sub-list it //
|
||||
/////////////////////////////////////////////////////////////
|
||||
page = page.subList(0, remainingLimit);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////
|
||||
// send this page of records into the pipe //
|
||||
/////////////////////////////////////////////
|
||||
getRecordPipe().addRecords(page);
|
||||
rowsAdded += page.size();
|
||||
}
|
||||
}
|
||||
else
|
||||
catch(QException qe)
|
||||
{
|
||||
throw (new QUserFacingException("Unsupported file type."));
|
||||
throw qe;
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
throw new QException("Unhandled error in bulk insert extract step", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/***************************************************************************
|
||||
**
|
||||
***************************************************************************/
|
||||
private int getMaxPageSize()
|
||||
{
|
||||
return (1000);
|
||||
}
|
||||
}
|
||||
|
@ -1,127 +0,0 @@
|
||||
/*
|
||||
* QQQ - Low-code Application Framework for Engineers.
|
||||
* Copyright (C) 2021-2022. Kingsrook, LLC
|
||||
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
|
||||
* contact@kingsrook.com
|
||||
* https://github.com/Kingsrook/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert;
|
||||
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.StorageAction;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.filehandling.FileToRowsInterface;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.mapping.RowsToRecordInterface;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.model.BulkInsertMapping;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.model.BulkLoadFileRow;
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractExtractStep;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Extract step for generic table bulk-insert ETL process
|
||||
**
|
||||
** This step does a little bit of transforming, actually - taking rows from
|
||||
** an uploaded file, and potentially merging them (for child-table use-cases)
|
||||
** and applying the "Mapping" - to put fully built records into the pipe for the
|
||||
** Transform step.
|
||||
*******************************************************************************/
|
||||
public class BulkInsertV2ExtractStep extends AbstractExtractStep
|
||||
{
|
||||
|
||||
/***************************************************************************
|
||||
**
|
||||
***************************************************************************/
|
||||
@Override
|
||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
int rowsAdded = 0;
|
||||
int originalLimit = Objects.requireNonNullElse(getLimit(), Integer.MAX_VALUE);
|
||||
|
||||
StorageInput storageInput = BulkInsertStepUtils.getStorageInputForTheFile(runBackendStepInput);
|
||||
BulkInsertMapping bulkInsertMapping = (BulkInsertMapping) runBackendStepOutput.getValue("bulkInsertMapping");
|
||||
RowsToRecordInterface rowsToRecord = bulkInsertMapping.getLayout().newRowsToRecordInterface();
|
||||
|
||||
try
|
||||
(
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// open a stream to read from our file, and a FileToRows object, that knows how to read from that stream //
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
InputStream inputStream = new StorageAction().getInputStream(storageInput);
|
||||
FileToRowsInterface fileToRowsInterface = FileToRowsInterface.forFile(storageInput.getReference(), inputStream);
|
||||
)
|
||||
{
|
||||
///////////////////////////////////////////////////////////
|
||||
// read the header row (if this file & mapping uses one) //
|
||||
///////////////////////////////////////////////////////////
|
||||
BulkLoadFileRow headerRow = bulkInsertMapping.getHasHeaderRow() ? fileToRowsInterface.next() : null;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// while there are more rows in the file - and we're under the limit - get more records form the file //
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
while(fileToRowsInterface.hasNext() && rowsAdded < originalLimit)
|
||||
{
|
||||
int remainingLimit = originalLimit - rowsAdded;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// put a page-size limit on the rows-to-record class, so it won't be tempted to do whole file all at once //
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
int pageLimit = Math.min(remainingLimit, getMaxPageSize());
|
||||
List<QRecord> page = rowsToRecord.nextPage(fileToRowsInterface, headerRow, bulkInsertMapping, pageLimit);
|
||||
|
||||
if(page.size() > remainingLimit)
|
||||
{
|
||||
/////////////////////////////////////////////////////////////
|
||||
// in case we got back more than we asked for, sub-list it //
|
||||
/////////////////////////////////////////////////////////////
|
||||
page = page.subList(0, remainingLimit);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////
|
||||
// send this page of records into the pipe //
|
||||
/////////////////////////////////////////////
|
||||
getRecordPipe().addRecords(page);
|
||||
rowsAdded += page.size();
|
||||
}
|
||||
}
|
||||
catch(QException qe)
|
||||
{
|
||||
throw qe;
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
throw new QException("Unhandled error in bulk insert extract step", e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/***************************************************************************
|
||||
**
|
||||
***************************************************************************/
|
||||
private int getMaxPageSize()
|
||||
{
|
||||
return (1000);
|
||||
}
|
||||
}
|
@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
|
||||
|
||||
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||
|
||||
@ -33,6 +34,7 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutp
|
||||
*******************************************************************************/
|
||||
public class NoopLoadStep extends AbstractLoadStep
|
||||
{
|
||||
private static final QLogger LOG = QLogger.getLogger(NoopLoadStep.class);
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -45,6 +47,7 @@ public class NoopLoadStep extends AbstractLoadStep
|
||||
///////////
|
||||
// noop. //
|
||||
///////////
|
||||
LOG.trace("noop");
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user