QQQ-37 adding pre & post run to ETL transform & load; minor QOL

This commit is contained in:
2022-08-30 13:44:34 -05:00
parent 6142b8e703
commit 69b9ed5b19
14 changed files with 189 additions and 34 deletions

View File

@ -41,6 +41,9 @@ public class RecordPipe
{
private static final Logger LOG = LogManager.getLogger(RecordPipe.class);
private static final long BLOCKING_SLEEP_MILLIS = 100;
private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(1_000);
private boolean isTerminated = false;
@ -53,6 +56,7 @@ public class RecordPipe
private List<QRecord> singleRecordListForPostRecordActions = new ArrayList<>();
/*******************************************************************************
** Turn off the pipe. Stop accepting new records (just ignore them in the add
** method). Clear the existing queue. Don't return any more records. Note that
@ -103,11 +107,22 @@ public class RecordPipe
{
boolean offerResult = queue.offer(record);
while(!offerResult && !isTerminated)
if(!offerResult && !isTerminated)
{
LOG.debug("Record pipe.add failed (due to full pipe). Blocking.");
SleepUtils.sleep(100, TimeUnit.MILLISECONDS);
offerResult = queue.offer(record);
long sleepLoopStartTime = System.currentTimeMillis();
long now = System.currentTimeMillis();
while(!offerResult && !isTerminated)
{
if(now - sleepLoopStartTime > MAX_SLEEP_LOOP_MILLIS)
{
LOG.warn("Giving up adding record to pipe, due to pipe being full for more than {} millis", MAX_SLEEP_LOOP_MILLIS);
throw (new IllegalStateException("Giving up adding record to pipe, due to pipe staying full too long."));
}
LOG.debug("Record pipe.add failed (due to full pipe). Blocking.");
SleepUtils.sleep(BLOCKING_SLEEP_MILLIS, TimeUnit.MILLISECONDS);
offerResult = queue.offer(record);
now = System.currentTimeMillis();
}
}
}

View File

@ -89,7 +89,7 @@ public class QPossibleValueTranslator
*******************************************************************************/
public void translatePossibleValuesInRecords(QTableMetaData table, List<QRecord> records)
{
if(records == null)
if(records == null || table == null)
{
return;
}

View File

@ -307,8 +307,8 @@ public class QInstanceEnricher
.withFormField(new QFieldMetaData("theFile", QFieldType.BLOB).withIsRequired(true))
.withComponent(new QFrontendComponentMetaData()
.withType(QComponentType.HELP_TEXT)
// .withValue("text", "Upload a CSV or XLSX file with the following columns: " + fieldsForHelpText));
.withValue("text", "Upload a CSV file with the following columns: " + fieldsForHelpText));
.withValue("text", "Upload a CSV file with the following columns: " + fieldsForHelpText))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.EDIT_FORM));
QBackendStepMetaData receiveFileStep = new QBackendStepMetaData()
.withName("receiveFile")
@ -322,7 +322,9 @@ public class QInstanceEnricher
.withComponent(new QFrontendComponentMetaData()
.withType(QComponentType.HELP_TEXT)
.withValue("text", "The records below were parsed from your file, and will be inserted if you click Submit."))
.withViewField(new QFieldMetaData("noOfFileRows", QFieldType.INTEGER).withLabel("# of file rows"));
.withViewField(new QFieldMetaData("noOfFileRows", QFieldType.INTEGER).withLabel("# of file rows"))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.VIEW_FORM))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.RECORD_LIST));
QBackendStepMetaData storeStep = new QBackendStepMetaData()
.withName("storeRecords")
@ -336,7 +338,9 @@ public class QInstanceEnricher
.withComponent(new QFrontendComponentMetaData()
.withType(QComponentType.HELP_TEXT)
.withValue("text", "The records below have been inserted."))
.withViewField(new QFieldMetaData("noOfFileRows", QFieldType.INTEGER).withLabel("# of file rows"));
.withViewField(new QFieldMetaData("noOfFileRows", QFieldType.INTEGER).withLabel("# of file rows"))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.VIEW_FORM))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.RECORD_LIST));
qInstance.addProcess(
new QProcessMetaData()
@ -375,9 +379,7 @@ public class QInstanceEnricher
The values you supply here will be updated in all of the records you are bulk editing.
You can clear out the value in a field by flipping the switch on for that field and leaving the input field blank.
Fields whose switches are off will not be updated."""))
.withComponent(new QFrontendComponentMetaData()
.withType(QComponentType.BULK_EDIT_FORM)
);
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.BULK_EDIT_FORM));
QBackendStepMetaData receiveValuesStep = new QBackendStepMetaData()
.withName("receiveValues")
@ -393,7 +395,9 @@ public class QInstanceEnricher
.withViewField(new QFieldMetaData(BulkEditReceiveValuesStep.FIELD_VALUES_BEING_UPDATED, QFieldType.STRING))
.withComponent(new QFrontendComponentMetaData()
.withType(QComponentType.HELP_TEXT)
.withValue("text", "The records below will be updated if you click Submit."));
.withValue("text", "The records below will be updated if you click Submit."))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.VIEW_FORM))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.RECORD_LIST));
QBackendStepMetaData storeStep = new QBackendStepMetaData()
.withName("storeRecords")
@ -407,7 +411,9 @@ public class QInstanceEnricher
.withViewField(new QFieldMetaData(BulkEditReceiveValuesStep.FIELD_VALUES_BEING_UPDATED, QFieldType.STRING))
.withComponent(new QFrontendComponentMetaData()
.withType(QComponentType.HELP_TEXT)
.withValue("text", "The records below have been updated."));
.withValue("text", "The records below have been updated."))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.VIEW_FORM))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.RECORD_LIST));
qInstance.addProcess(
new QProcessMetaData()
@ -437,7 +443,8 @@ public class QInstanceEnricher
.withRecordListFields(new ArrayList<>(table.getFields().values()))
.withComponent(new QFrontendComponentMetaData()
.withType(QComponentType.HELP_TEXT)
.withValue("text", "The records below will be deleted if you click Submit."));
.withValue("text", "The records below will be deleted if you click Submit."))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.RECORD_LIST));
QBackendStepMetaData storeStep = new QBackendStepMetaData()
.withName("delete")
@ -448,7 +455,8 @@ public class QInstanceEnricher
.withRecordListFields(new ArrayList<>(table.getFields().values()))
.withComponent(new QFrontendComponentMetaData()
.withType(QComponentType.HELP_TEXT)
.withValue("text", "The records below have been deleted."));
.withValue("text", "The records below have been deleted."))
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.RECORD_LIST));
qInstance.addProcess(
new QProcessMetaData()
@ -569,6 +577,7 @@ public class QInstanceEnricher
}
/*******************************************************************************
** If a table didn't have any sections, generate "sensible defaults"
*******************************************************************************/

View File

@ -76,6 +76,17 @@ public class QFieldMetaData
/*******************************************************************************
**
*******************************************************************************/
@Override
public String toString()
{
return ("QFieldMetaData[" + name + "]");
}
/*******************************************************************************
** Initialize a fieldMetaData from a reference to a getter on an entity.
** e.g., new QFieldMetaData(Order::getOrderNo).

View File

@ -51,6 +51,17 @@ public class QProcessMetaData implements QAppChildMetaData
/*******************************************************************************
**
*******************************************************************************/
@Override
public String toString()
{
return ("QProcessMetaData[" + name + "]");
}
/*******************************************************************************
** Getter for name
**

View File

@ -8,6 +8,7 @@ import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
@ -29,6 +30,32 @@ public abstract class AbstractLoadStep implements BackendStep
/*******************************************************************************
** Allow subclasses to do an action before the run is complete - before any
** pages of records are passed in.
*******************************************************************************/
public void preRun(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
////////////////////////
// noop in base class //
////////////////////////
}
/*******************************************************************************
** Allow subclasses to do an action after the run is complete - after the last
** page of records is passed in.
*******************************************************************************/
public void postRun(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
////////////////////////
// noop in base class //
////////////////////////
}
/*******************************************************************************
**
*******************************************************************************/

View File

@ -4,6 +4,9 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
import java.util.ArrayList;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
@ -16,11 +19,37 @@ import com.kingsrook.qqq.backend.core.model.data.QRecord;
*******************************************************************************/
public abstract class AbstractTransformStep implements BackendStep
{
private List<QRecord> inputRecordPage = new ArrayList<>();
private List<QRecord> inputRecordPage = new ArrayList<>();
private List<QRecord> outputRecordPage = new ArrayList<>();
/*******************************************************************************
** Allow subclasses to do an action before the run is complete - before any
** pages of records are passed in.
*******************************************************************************/
public void preRun(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
////////////////////////
// noop in base class //
////////////////////////
}
/*******************************************************************************
** Allow subclasses to do an action after the run is complete - after the last
** page of records is passed in.
*******************************************************************************/
public void postRun(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
////////////////////////
// noop in base class //
////////////////////////
}
/*******************************************************************************
** Getter for recordPage
**

View File

@ -1,9 +1,14 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
import com.kingsrook.qqq.backend.core.actions.values.QPossibleValueTranslator;
import com.kingsrook.qqq.backend.core.actions.values.QValueFormatter;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
/*******************************************************************************
@ -46,4 +51,23 @@ public class BaseStreamedETLStep
return (QCodeLoader.getBackendStep(AbstractLoadStep.class, codeReference));
}
/*******************************************************************************
**
*******************************************************************************/
protected void updateRecordsWithDisplayValuesAndPossibleValues(RunBackendStepInput input, List<QRecord> list)
{
String destinationTable = input.getValueString(StreamedETLWithFrontendProcess.FIELD_DESTINATION_TABLE);
QTableMetaData table = input.getInstance().getTable(destinationTable);
if(table != null && list != null)
{
QValueFormatter qValueFormatter = new QValueFormatter();
qValueFormatter.setDisplayValuesInRecords(table, list);
QPossibleValueTranslator qPossibleValueTranslator = new QPossibleValueTranslator(input.getInstance(), input.getSession());
qPossibleValueTranslator.translatePossibleValuesInRecords(input.getTable(), list);
}
}
}

View File

@ -67,6 +67,8 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
AbstractLoadStep loadStep = getLoadStep(runBackendStepInput);
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
transaction = loadStep.openTransaction(runBackendStepInput);
loadStep.setTransaction(transaction);
@ -80,15 +82,9 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
);
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT, recordCount);
runBackendStepOutput.setRecords(loadedRecordList);
/////////////////////
// commit the work //
/////////////////////
if(transaction.isPresent())
{
transaction.get().commit();
}
updateRecordsWithDisplayValuesAndPossibleValues(runBackendStepInput, loadedRecordList);
runBackendStepOutput.setRecords(loadedRecordList);
if(transformStep instanceof ProcessSummaryProviderInterface processSummaryProvider)
{
@ -98,6 +94,17 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
//////////////////////////////////////////////////////////////////////////////////////////////
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_PROCESS_SUMMARY, processSummaryProvider.getProcessSummary(true));
}
transformStep.postRun(runBackendStepInput, runBackendStepOutput);
loadStep.postRun(runBackendStepInput, runBackendStepOutput);
/////////////////////
// commit the work //
/////////////////////
if(transaction.isPresent())
{
transaction.get().commit();
}
}
catch(Exception e)
{

View File

@ -90,6 +90,7 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
extractStep.setRecordPipe(recordPipe);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
List<QRecord> previewRecordList = new ArrayList<>();
new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) ->
@ -100,7 +101,10 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
() -> (consumeRecordsFromPipe(recordPipe, transformStep, runBackendStepInput, runBackendStepOutput, previewRecordList))
);
updateRecordsWithDisplayValuesAndPossibleValues(runBackendStepInput, previewRecordList);
runBackendStepOutput.setRecords(previewRecordList);
transformStep.postRun(runBackendStepInput, runBackendStepOutput);
}

View File

@ -94,11 +94,13 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
if(!(transformStep instanceof ProcessSummaryProviderInterface processSummaryProvider))
{
// todo - really? if this is required, then put it on the AbstractTransformStep class
throw (new QException("Transform Step " + transformStep.getClass().getName() + " does not implement ProcessSummaryProviderInterface."));
}
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
List<QRecord> previewRecordList = new ArrayList<>();
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Preview>ValidateStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) ->
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Preview>ValidateStep", null, recordPipe, (status) ->
{
extractStep.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput);
@ -106,6 +108,7 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
() -> (consumeRecordsFromPipe(recordPipe, transformStep, runBackendStepInput, runBackendStepOutput, previewRecordList))
);
updateRecordsWithDisplayValuesAndPossibleValues(runBackendStepInput, previewRecordList);
runBackendStepOutput.setRecords(previewRecordList);
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT, recordCount);
@ -113,6 +116,8 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
// get the process summary from the validation step //
//////////////////////////////////////////////////////
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_VALIDATION_SUMMARY, processSummaryProvider.getProcessSummary(false));
transformStep.postRun(runBackendStepInput, runBackendStepOutput);
}

View File

@ -78,6 +78,10 @@ public class StreamedETLWithFrontendProcess
public static final String FIELD_VALIDATION_SUMMARY = "validationSummary"; // List<ProcessSummaryLine>
public static final String FIELD_PROCESS_SUMMARY = "processResults"; // List<ProcessSummaryLine>
public static final String DEFAULT_PREVIEW_MESSAGE_FOR_INSERT = "This is a preview of the records that will be created.";
public static final String DEFAULT_PREVIEW_MESSAGE_FOR_UPDATE = "This is a preview of the records that will be updated.";
public static final String FIELD_PREVIEW_MESSAGE = "previewMessage";
/*******************************************************************************
@ -106,6 +110,7 @@ public class StreamedETLWithFrontendProcess
** - FIELD_SUPPORTS_FULL_VALIDATION
** - FIELD_DEFAULT_QUERY_FILTER
** - FIELD_DO_FULL_VALIDATION
** - FIELD_PREVIEW_MESSAGE
*******************************************************************************/
public static QProcessMetaData defineProcessMetaData(
Class<? extends AbstractExtractStep> extractStepClass,
@ -119,10 +124,12 @@ public class StreamedETLWithFrontendProcess
.withCode(new QCodeReference(StreamedETLPreviewStep.class))
.withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData(FIELD_SOURCE_TABLE, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_SOURCE_TABLE)))
.withField(new QFieldMetaData(FIELD_DESTINATION_TABLE, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DESTINATION_TABLE)))
.withField(new QFieldMetaData(FIELD_SUPPORTS_FULL_VALIDATION, QFieldType.BOOLEAN).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_SUPPORTS_FULL_VALIDATION, false)))
.withField(new QFieldMetaData(FIELD_DEFAULT_QUERY_FILTER, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER)))
.withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(new QCodeReference(extractStepClass)))
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(new QCodeReference(transformStepClass)))
.withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT)))
);
QFrontendStepMetaData reviewStep = new QFrontendStepMetaData()
@ -142,7 +149,6 @@ public class StreamedETLWithFrontendProcess
.withName(STEP_NAME_EXECUTE)
.withCode(new QCodeReference(StreamedETLExecuteStep.class))
.withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData(FIELD_DESTINATION_TABLE, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DESTINATION_TABLE)))
.withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(new QCodeReference(loadStepClass))))
.withOutputMetaData(new QFunctionOutputMetaData()
.withField(new QFieldMetaData(FIELD_PROCESS_SUMMARY, QFieldType.STRING))

View File

@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.javalin;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@ -443,16 +444,21 @@ public class QJavalinProcessHandler
}
ProcessState processState = optionalProcessState.get();
List<QRecord> records = processState.getRecords();
if(CollectionUtils.nullSafeIsEmpty(records))
List<QRecord> records = processState.getRecords();
Map<String, Object> resultForCaller = new HashMap<>();
if(records == null)
{
throw (new Exception("No records were found for the process."));
resultForCaller.put("records", new ArrayList<>());
resultForCaller.put("totalRecords", 0);
}
else
{
List<QRecord> recordPage = CollectionUtils.safelyGetPage(records, skip, limit);
resultForCaller.put("records", recordPage);
resultForCaller.put("totalRecords", records.size());
}
Map<String, Object> resultForCaller = new HashMap<>();
List<QRecord> recordPage = CollectionUtils.safelyGetPage(records, skip, limit);
resultForCaller.put("records", recordPage);
resultForCaller.put("totalRecords", records.size());
context.result(JsonUtils.toJson(resultForCaller));
}
catch(Exception e)

View File

@ -402,6 +402,7 @@ public class SampleMetaDataProvider
values.put(StreamedETLWithFrontendProcess.FIELD_SOURCE_TABLE, TABLE_NAME_PERSON);
values.put(StreamedETLWithFrontendProcess.FIELD_DESTINATION_TABLE, TABLE_NAME_PERSON);
values.put(StreamedETLWithFrontendProcess.FIELD_SUPPORTS_FULL_VALIDATION, true);
values.put(StreamedETLWithFrontendProcess.FIELD_PREVIEW_MESSAGE, "This is a preview of what the clones will look like.");
QProcessMetaData process = StreamedETLWithFrontendProcess.defineProcessMetaData(
ExtractViaQueryStep.class,