Checkpoint - working versions of streamed with frontend processes, with validation

This commit is contained in:
2022-08-29 13:33:35 -05:00
parent 5f8f063b99
commit cb22f86793
31 changed files with 1253 additions and 134 deletions

View File

@ -119,7 +119,9 @@ public class QCodeLoader
try
{
Class<?> customizerClass = Class.forName(codeReference.getName());
return ((T) customizerClass.getConstructor().newInstance());
@SuppressWarnings("unchecked")
T t = (T) customizerClass.getConstructor().newInstance();
return t;
}
catch(Exception e)
{

View File

@ -24,6 +24,7 @@ public enum TableCustomizers
{
POST_QUERY_RECORD(new TableCustomizer("postQueryRecord", Function.class, ((Object x) ->
{
@SuppressWarnings("unchecked")
Function<QRecord, QRecord> function = (Function<QRecord, QRecord>) x;
QRecord output = function.apply(new QRecord());
})));

View File

@ -82,15 +82,28 @@ public class RunProcessAction
runProcessOutput.setProcessUUID(runProcessInput.getProcessUUID());
UUIDAndTypeStateKey stateKey = new UUIDAndTypeStateKey(UUID.fromString(runProcessInput.getProcessUUID()), StateType.PROCESS_STATUS);
ProcessState processState = primeProcessState(runProcessInput, stateKey);
ProcessState processState = primeProcessState(runProcessInput, stateKey, process);
// todo - custom routing
List<QStepMetaData> stepList = getAvailableStepList(process, runProcessInput);
try
{
String lastStepName = runProcessInput.getStartAfterStep();
STEP_LOOP:
for(QStepMetaData step : stepList)
while(true)
{
///////////////////////////////////////////////////////////////////////////////////////////////////////
// always refresh the step list - as any step that runs can modify it (in the process state). //
// this is why we don't do a loop over the step list - as we'd get ConcurrentModificationExceptions. //
///////////////////////////////////////////////////////////////////////////////////////////////////////
List<QStepMetaData> stepList = getAvailableStepList(processState, process, lastStepName);
if(stepList.isEmpty())
{
break;
}
QStepMetaData step = stepList.get(0);
lastStepName = step.getName();
if(step instanceof QFrontendStepMetaData)
{
////////////////////////////////////////////////////////////////
@ -127,6 +140,7 @@ public class RunProcessAction
///////////////////////
// Run backend steps //
///////////////////////
LOG.info("Running backend step [" + step.getName() + "] in process [" + process.getName() + "]");
runBackendStep(runProcessInput, process, runProcessOutput, stateKey, backendStepMetaData, process, processState);
}
else
@ -169,7 +183,7 @@ public class RunProcessAction
** When we start running a process (or resuming it), get data in the RunProcessRequest
** either from the state provider (if they're found, for a resume).
*******************************************************************************/
ProcessState primeProcessState(RunProcessInput runProcessInput, UUIDAndTypeStateKey stateKey) throws QException
ProcessState primeProcessState(RunProcessInput runProcessInput, UUIDAndTypeStateKey stateKey, QProcessMetaData process) throws QException
{
Optional<ProcessState> optionalProcessState = loadState(stateKey);
if(optionalProcessState.isEmpty())
@ -177,11 +191,13 @@ public class RunProcessAction
if(runProcessInput.getStartAfterStep() == null)
{
///////////////////////////////////////////////////////////////////////////////////
// this is fine - it means its our first time running in the backend. //
// this is fine - it means it's our first time running in the backend. //
// Go ahead and store the state that we have (e.g., w/ initial records & values) //
///////////////////////////////////////////////////////////////////////////////////
storeState(stateKey, runProcessInput.getProcessState());
optionalProcessState = Optional.of(runProcessInput.getProcessState());
ProcessState processState = runProcessInput.getProcessState();
processState.setStepList(process.getStepList().stream().map(QStepMetaData::getName).toList());
storeState(stateKey, processState);
optionalProcessState = Optional.of(processState);
}
else
{
@ -249,41 +265,63 @@ public class RunProcessAction
/*******************************************************************************
** Get the list of steps which are eligible to run.
*******************************************************************************/
private List<QStepMetaData> getAvailableStepList(QProcessMetaData process, RunProcessInput runProcessInput)
private List<QStepMetaData> getAvailableStepList(ProcessState processState, QProcessMetaData process, String lastStep) throws QException
{
if(runProcessInput.getStartAfterStep() == null)
if(lastStep == null)
{
/////////////////////////////////////////////////////////////////////////////
// if the caller did not supply a 'startAfterStep', then use the full list //
/////////////////////////////////////////////////////////////////////////////
return (process.getStepList());
///////////////////////////////////////////////////////////////////////
// if the caller did not supply a 'lastStep', then use the full list //
///////////////////////////////////////////////////////////////////////
return (stepNamesToSteps(process, processState.getStepList()));
}
else
{
////////////////////////////////////////////////////////////////////////////////
// else, loop until the startAfterStep is found, and return the ones after it //
////////////////////////////////////////////////////////////////////////////////
boolean foundStartAfterStep = false;
List<QStepMetaData> rs = new ArrayList<>();
////////////////////////////////////////////////////////////////////////////
// else, loop until the 'lastStep' is found, and return the ones after it //
////////////////////////////////////////////////////////////////////////////
boolean foundLastStep = false;
List<String> validStepNames = new ArrayList<>();
for(QStepMetaData step : process.getStepList())
for(String stepName : processState.getStepList())
{
if(foundStartAfterStep)
if(foundLastStep)
{
rs.add(step);
validStepNames.add(stepName);
}
if(step.getName().equals(runProcessInput.getStartAfterStep()))
if(stepName.equals(lastStep))
{
foundStartAfterStep = true;
foundLastStep = true;
}
}
return (rs);
return (stepNamesToSteps(process, validStepNames));
}
}
/*******************************************************************************
**
*******************************************************************************/
private List<QStepMetaData> stepNamesToSteps(QProcessMetaData process, List<String> stepNames) throws QException
{
List<QStepMetaData> result = new ArrayList<>();
for(String stepName : stepNames)
{
QStepMetaData step = process.getStep(stepName);
if(step == null)
{
throw(new QException("Could not find a step named [" + stepName + "] in this process."));
}
result.add(step);
}
return (result);
}
/*******************************************************************************
** Load an instance of the appropriate state provider
**

View File

@ -38,6 +38,7 @@ public class ProcessState implements Serializable
{
private List<QRecord> records = new ArrayList<>();
private Map<String, Serializable> values = new HashMap<>();
private List<String> stepList = new ArrayList<>();
private Optional<String> nextStepName = Optional.empty();
@ -117,4 +118,25 @@ public class ProcessState implements Serializable
this.nextStepName = Optional.empty();
}
/*******************************************************************************
** Getter for stepList
**
*******************************************************************************/
public List<String> getStepList()
{
return stepList;
}
/*******************************************************************************
** Setter for stepList
**
*******************************************************************************/
public void setStepList(List<String> stepList)
{
this.stepList = stepList;
}
}

View File

@ -0,0 +1,180 @@
package com.kingsrook.qqq.backend.core.model.actions.processes;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
/*******************************************************************************
** For processes that may show a review & result screen, this class provides a
** standard way to summarize information about the records in the process.
**
*******************************************************************************/
public class ProcessSummaryLine implements Serializable
{
private Status status;
private Integer count;
private String message;
//////////////////////////////////////////////////////////////////////////
// using ArrayList, because we need to be Serializable, and List is not //
//////////////////////////////////////////////////////////////////////////
private ArrayList<Serializable> primaryKeys;
/*******************************************************************************
**
*******************************************************************************/
public ProcessSummaryLine(Status status, Integer count, String message, ArrayList<Serializable> primaryKeys)
{
this.status = status;
this.count = count;
this.message = message;
this.primaryKeys = primaryKeys;
}
/*******************************************************************************
**
*******************************************************************************/
public ProcessSummaryLine(Status status, Integer count, String message)
{
this.status = status;
this.count = count;
this.message = message;
}
/*******************************************************************************
** Getter for status
**
*******************************************************************************/
public Status getStatus()
{
return status;
}
/*******************************************************************************
** Setter for status
**
*******************************************************************************/
public void setStatus(Status status)
{
this.status = status;
}
/*******************************************************************************
** Getter for primaryKeys
**
*******************************************************************************/
public List<Serializable> getPrimaryKeys()
{
return primaryKeys;
}
/*******************************************************************************
** Setter for primaryKeys
**
*******************************************************************************/
public void setPrimaryKeys(ArrayList<Serializable> primaryKeys)
{
this.primaryKeys = primaryKeys;
}
/*******************************************************************************
** Getter for count
**
*******************************************************************************/
public Integer getCount()
{
return count;
}
/*******************************************************************************
** Setter for count
**
*******************************************************************************/
public void setCount(Integer count)
{
this.count = count;
}
/*******************************************************************************
** Getter for message
**
*******************************************************************************/
public String getMessage()
{
return message;
}
/*******************************************************************************
** Setter for message
**
*******************************************************************************/
public void setMessage(String message)
{
this.message = message;
}
/*******************************************************************************
**
*******************************************************************************/
public void incrementCount()
{
if(count == null)
{
count = 0;
}
count++;
}
/*******************************************************************************
**
*******************************************************************************/
public void incrementCountAndAddPrimaryKey(Serializable primaryKey)
{
incrementCount();
if(primaryKeys == null)
{
primaryKeys = new ArrayList<>();
}
primaryKeys.add(primaryKey);
}
/*******************************************************************************
**
*******************************************************************************/
public void addSelfToListIfAnyCount(ArrayList<ProcessSummaryLine> rs)
{
if(count != null && count > 0)
{
rs.add(this);
}
}
}

View File

@ -354,7 +354,30 @@ public class RunBackendStepInput extends AbstractActionInput
*******************************************************************************/
public String getValueString(String fieldName)
{
return ((String) getValue(fieldName));
return (ValueUtils.getValueAsString(getValue(fieldName)));
}
/*******************************************************************************
** Getter for a single field's value
**
*******************************************************************************/
public Boolean getValueBoolean(String fieldName)
{
return (ValueUtils.getValueAsBoolean(getValue(fieldName)));
}
/*******************************************************************************
** Getter for a single field's value as a primitive boolean
**
*******************************************************************************/
public boolean getValue_boolean(String fieldName)
{
Boolean valueAsBoolean = ValueUtils.getValueAsBoolean(getValue(fieldName));
return (valueAsBoolean != null && valueAsBoolean);
}

View File

@ -0,0 +1,13 @@
package com.kingsrook.qqq.backend.core.model.actions.processes;
/*******************************************************************************
** Simple status enum - initially for statusesqqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/processes/Status.java in process status lines.
*******************************************************************************/
public enum Status
{
OK,
WARNING,
ERROR,
INFO
}

View File

@ -34,6 +34,7 @@ import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import org.apache.commons.lang.SerializationUtils;
/*******************************************************************************
@ -87,16 +88,60 @@ public class QRecord implements Serializable
/*******************************************************************************
** Copy constructor.
** TODO ... should this do deep copies?
**
*******************************************************************************/
@SuppressWarnings("unchecked")
public QRecord(QRecord record)
{
this.tableName = record.tableName;
this.recordLabel = record.recordLabel;
this.values = record.values;
this.displayValues = record.displayValues;
this.backendDetails = record.backendDetails;
this.errors = record.errors;
this.values = doDeepCopy(record.values);
this.displayValues = doDeepCopy(record.displayValues);
this.backendDetails = doDeepCopy(record.backendDetails);
this.errors = doDeepCopy(record.errors);
}
/*******************************************************************************
**
*******************************************************************************/
@SuppressWarnings({ "rawtypes", "unchecked" })
private Map doDeepCopy(Map map)
{
if(map == null)
{
return (null);
}
if(map instanceof Serializable serializableMap)
{
return (Map) SerializationUtils.clone(serializableMap);
}
return (new LinkedHashMap(map));
}
/*******************************************************************************
**
*******************************************************************************/
@SuppressWarnings({ "rawtypes", "unchecked" })
private List doDeepCopy(List list)
{
if(list == null)
{
return (null);
}
if(list instanceof Serializable serializableList)
{
return (List) SerializationUtils.clone(serializableList);
}
return (new ArrayList(list));
}
@ -142,7 +187,6 @@ public class QRecord implements Serializable
/*******************************************************************************
**
*******************************************************************************/
@ -209,6 +253,7 @@ public class QRecord implements Serializable
}
/*******************************************************************************
** Fluent setter for recordLabel
**

View File

@ -75,11 +75,10 @@ public class QBackendMetaData
/*******************************************************************************
** Fluent setter, returning generically, to help sub-class fluent flows
*******************************************************************************/
@SuppressWarnings("unchecked")
public <T extends QBackendMetaData> T withName(String name)
public QBackendMetaData withName(String name)
{
this.name = name;
return (T) this;
return this;
}

View File

@ -35,6 +35,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.modules.authentication.metadata.QAuthenticationMetaData;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
/*******************************************************************************
@ -129,6 +130,10 @@ public class QInstance
*******************************************************************************/
public void addBackend(String name, QBackendMetaData backend)
{
if(!StringUtils.hasContent(name))
{
throw (new IllegalArgumentException("Attempted to add a backend without a name."));
}
if(this.backends.containsKey(name))
{
throw (new IllegalArgumentException("Attempted to add a second backend with name: " + name));
@ -163,6 +168,10 @@ public class QInstance
*******************************************************************************/
public void addTable(String name, QTableMetaData table)
{
if(!StringUtils.hasContent(name))
{
throw (new IllegalArgumentException("Attempted to add a table without a name."));
}
if(this.tables.containsKey(name))
{
throw (new IllegalArgumentException("Attempted to add a second table with name: " + name));
@ -202,6 +211,10 @@ public class QInstance
*******************************************************************************/
public void addPossibleValueSource(String name, QPossibleValueSource possibleValueSource)
{
if(!StringUtils.hasContent(name))
{
throw (new IllegalArgumentException("Attempted to add a possibleValueSource without a name."));
}
if(this.possibleValueSources.containsKey(name))
{
throw (new IllegalArgumentException("Attempted to add a second possibleValueSource with name: " + name));
@ -252,6 +265,10 @@ public class QInstance
*******************************************************************************/
public void addProcess(String name, QProcessMetaData process)
{
if(!StringUtils.hasContent(name))
{
throw (new IllegalArgumentException("Attempted to add a process without a name."));
}
if(this.processes.containsKey(name))
{
throw (new IllegalArgumentException("Attempted to add a second process with name: " + name));
@ -286,6 +303,10 @@ public class QInstance
*******************************************************************************/
public void addApp(String name, QAppMetaData app)
{
if(!StringUtils.hasContent(name))
{
throw (new IllegalArgumentException("Attempted to add an app without a name."));
}
if(this.apps.containsKey(name))
{
throw (new IllegalArgumentException("Attempted to add a second app with name: " + name));

View File

@ -28,7 +28,12 @@ package com.kingsrook.qqq.backend.core.model.metadata.processes;
public enum QComponentType
{
HELP_TEXT,
BULK_EDIT_FORM;
BULK_EDIT_FORM,
VALIDATION_REVIEW_SCREEN,
EDIT_FORM,
VIEW_FORM,
RECORD_LIST,
PROCESS_SUMMARY_RESULTS;
///////////////////////////////////////////////////////////////////////////
// keep these values in sync with QComponentType.ts in qqq-frontend-core //
///////////////////////////////////////////////////////////////////////////

View File

@ -33,7 +33,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
*******************************************************************************/
public class QFunctionOutputMetaData
{
private QRecordListMetaData recordListMetaData;
private QRecordListMetaData recordListMetaData;
private List<QFieldMetaData> fieldList;
@ -106,11 +106,12 @@ public class QFunctionOutputMetaData
/*******************************************************************************
** Setter for fieldList
** Fluently add a field to the list
**
*******************************************************************************/
public QFunctionOutputMetaData addField(QFieldMetaData field)
public QFunctionOutputMetaData withField(QFieldMetaData field)
{
if(this.fieldList == null)
{

View File

@ -23,7 +23,9 @@ package com.kingsrook.qqq.backend.core.model.metadata.processes;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.layout.QAppChildMetaData;
@ -41,7 +43,8 @@ public class QProcessMetaData implements QAppChildMetaData
private String tableName;
private boolean isHidden = false;
private List<QStepMetaData> stepList;
private List<QStepMetaData> stepList; // these are the steps that are ran, by-default, in the order they are ran in
private Map<String, QStepMetaData> steps; // this is the full map of possible steps
private String parentAppName;
private QIcon icon;
@ -167,14 +170,18 @@ public class QProcessMetaData implements QAppChildMetaData
*******************************************************************************/
public QProcessMetaData withStepList(List<QStepMetaData> stepList)
{
this.stepList = stepList;
if(stepList != null)
{
stepList.forEach(this::addStep);
}
return (this);
}
/*******************************************************************************
** Setter for stepList
** add a step to the stepList and map
**
*******************************************************************************/
public QProcessMetaData addStep(QStepMetaData step)
@ -184,6 +191,30 @@ public class QProcessMetaData implements QAppChildMetaData
this.stepList = new ArrayList<>();
}
this.stepList.add(step);
if(this.steps == null)
{
this.steps = new HashMap<>();
}
this.steps.put(step.getName(), step);
return (this);
}
/*******************************************************************************
** add a step ONLY to the step map - NOT the list w/ default execution order.
**
*******************************************************************************/
public QProcessMetaData addOptionalStep(QStepMetaData step)
{
if(this.steps == null)
{
this.steps = new HashMap<>();
}
this.steps.put(step.getName(), step);
return (this);
}
@ -205,15 +236,7 @@ public class QProcessMetaData implements QAppChildMetaData
*******************************************************************************/
public QStepMetaData getStep(String stepName)
{
for(QStepMetaData step : stepList)
{
if(step.getName().equals(stepName))
{
return (step);
}
}
return (null);
return (steps.get(stepName));
}
@ -245,9 +268,9 @@ public class QProcessMetaData implements QAppChildMetaData
public List<QFieldMetaData> getInputFields()
{
List<QFieldMetaData> rs = new ArrayList<>();
if(stepList != null)
if(steps != null)
{
for(QStepMetaData step : stepList)
for(QStepMetaData step : steps.values())
{
rs.addAll(step.getInputFields());
}
@ -264,9 +287,9 @@ public class QProcessMetaData implements QAppChildMetaData
public List<QFieldMetaData> getOutputFields()
{
List<QFieldMetaData> rs = new ArrayList<>();
if(stepList != null)
if(steps != null)
{
for(QStepMetaData step : stepList)
for(QStepMetaData step : steps.values())
{
rs.addAll(step.getOutputFields());
}

View File

@ -83,7 +83,7 @@ public class BasicETLProcess
.withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData(FIELD_DESTINATION_TABLE, QFieldType.STRING)))
.withOutputMetaData(new QFunctionOutputMetaData()
.addField(new QFieldMetaData(FIELD_RECORD_COUNT, QFieldType.INTEGER)));
.withField(new QFieldMetaData(FIELD_RECORD_COUNT, QFieldType.INTEGER)));
return new QProcessMetaData()
.withName(PROCESS_NAME)

View File

@ -66,7 +66,7 @@ public class StreamedETLProcess
.withField(new QFieldMetaData(FIELD_MAPPING_JSON, QFieldType.STRING))
.withField(new QFieldMetaData(FIELD_DESTINATION_TABLE, QFieldType.STRING)))
.withOutputMetaData(new QFunctionOutputMetaData()
.addField(new QFieldMetaData(FIELD_RECORD_COUNT, QFieldType.INTEGER)));
.withField(new QFieldMetaData(FIELD_RECORD_COUNT, QFieldType.INTEGER)));
return new QProcessMetaData()
.withName(PROCESS_NAME)

View File

@ -0,0 +1,19 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.ArrayList;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine;
/*******************************************************************************
**
*******************************************************************************/
public interface ProcessSummaryProviderInterface
{
/*******************************************************************************
**
*******************************************************************************/
ArrayList<ProcessSummaryLine> getProcessSummary(boolean isForResultScreen);
}

View File

@ -33,7 +33,6 @@ import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.StreamedETLProcess;
/*******************************************************************************
@ -80,7 +79,7 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
() -> (consumeRecordsFromPipe(recordPipe, transformStep, loadStep, runBackendStepInput, runBackendStepOutput, loadedRecordList))
);
runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount);
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT, recordCount);
runBackendStepOutput.setRecords(loadedRecordList);
/////////////////////
@ -90,6 +89,15 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
{
transaction.get().commit();
}
if(transformStep instanceof ProcessSummaryProviderInterface processSummaryProvider)
{
//////////////////////////////////////////////////////////////////////////////////////////////
// get the process summary from the ... transform step? the load step? each knows some... //
// TODO!! //
//////////////////////////////////////////////////////////////////////////////////////////////
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_PROCESS_SUMMARY, processSummaryProvider.getProcessSummary(true));
}
}
catch(Exception e)
{
@ -121,7 +129,7 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
*******************************************************************************/
private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, AbstractLoadStep loadStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> loadedRecordList) throws QException
{
Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLProcess.FIELD_RECORD_COUNT);
Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT);
if(totalRows != null)
{
runBackendStepInput.getAsyncJobCallback().updateStatus(currentRowCount, totalRows);

View File

@ -32,6 +32,8 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInpu
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.StreamedETLProcess;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/*******************************************************************************
@ -39,6 +41,8 @@ import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.Str
*******************************************************************************/
public class StreamedETLPreviewStep extends BaseStreamedETLStep implements BackendStep
{
private static final Logger LOG = LogManager.getLogger(StreamedETLPreviewStep.class);
/*******************************************************************************
@ -48,29 +52,72 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
@SuppressWarnings("checkstyle:indentation")
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
RecordPipe recordPipe = new RecordPipe();
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
extractStep.setLimit(PROCESS_OUTPUT_RECORD_LIST_LIMIT); // todo - make this an input?
extractStep.setRecordPipe(recordPipe);
Integer limit = PROCESS_OUTPUT_RECORD_LIST_LIMIT; // todo - use a field instead of hard-coded here?
//////////////////////////////////////////////////////////////////////////////////////////////////////////
// if the do-full-validation flag has already been set, then do the validation step instead of this one //
//////////////////////////////////////////////////////////////////////////////////////////////////////////
boolean supportsFullValidation = runBackendStepInput.getValue_boolean(StreamedETLWithFrontendProcess.FIELD_SUPPORTS_FULL_VALIDATION);
boolean doFullValidation = runBackendStepInput.getValue_boolean(StreamedETLWithFrontendProcess.FIELD_DO_FULL_VALIDATION);
if(supportsFullValidation && doFullValidation)
{
skipToValidateStep(runBackendStepOutput);
return;
}
///////////////////////////////////////////
// request a count from the extract step //
///////////////////////////////////////////
Integer recordCount = extractStep.doCount(runBackendStepInput);
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
Integer recordCount = extractStep.doCount(runBackendStepInput);
runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// if the count is less than the normal limit here, and this process supports validation, then go straight to the validation step //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo - maybe some future version we do this - maybe based on a user-preference
// if(supportsFullValidation && recordCount <= limit)
// {
// skipToValidateStep(runBackendStepOutput);
// return;
// }
////////////////////////////////////////////////////////
// proceed with a doing a limited extract & transform //
////////////////////////////////////////////////////////
RecordPipe recordPipe = new RecordPipe();
extractStep.setLimit(limit);
extractStep.setRecordPipe(recordPipe);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
List<QRecord> transformedRecordList = new ArrayList<>();
List<QRecord> previewRecordList = new ArrayList<>();
new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) ->
{
extractStep.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput);
},
() -> (consumeRecordsFromPipe(recordPipe, transformStep, runBackendStepInput, runBackendStepOutput, transformedRecordList))
() -> (consumeRecordsFromPipe(recordPipe, transformStep, runBackendStepInput, runBackendStepOutput, previewRecordList))
);
runBackendStepOutput.setRecords(transformedRecordList);
runBackendStepOutput.setRecords(previewRecordList);
}
/*******************************************************************************
**
*******************************************************************************/
private void skipToValidateStep(RunBackendStepOutput runBackendStepOutput)
{
LOG.info("Skipping to validation step");
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_DO_FULL_VALIDATION, true);
ArrayList<String> stepList = new ArrayList<>(runBackendStepOutput.getProcessState().getStepList());
System.out.println("Step list pre: " + stepList);
stepList.removeIf(s -> s.equals(StreamedETLWithFrontendProcess.STEP_NAME_REVIEW));
stepList.add(stepList.indexOf(StreamedETLWithFrontendProcess.STEP_NAME_VALIDATE) + 1, StreamedETLWithFrontendProcess.STEP_NAME_REVIEW);
runBackendStepOutput.getProcessState().setStepList(stepList);
System.out.println("Step list post: " + stepList);
}

View File

@ -0,0 +1,155 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.ArrayList;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/*******************************************************************************
** Backend step to do a full validation of a streamed ETL job
*******************************************************************************/
public class StreamedETLValidateStep extends BaseStreamedETLStep implements BackendStep
{
private static final Logger LOG = LogManager.getLogger(StreamedETLValidateStep.class);
private int currentRowCount = 1;
/*******************************************************************************
**
*******************************************************************************/
@Override
@SuppressWarnings("checkstyle:indentation")
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
/////////////////////////////////////////////////////////////////////
// check if we are supported in this process - if not, return noop //
/////////////////////////////////////////////////////////////////////
boolean supportsFullValidation = runBackendStepInput.getValue_boolean(StreamedETLWithFrontendProcess.FIELD_SUPPORTS_FULL_VALIDATION);
if(!supportsFullValidation)
{
LOG.info("Process does not support validation, so skipping validation step");
return;
}
////////////////////////////////////////////////////////////////////////////////
// check if we've been requested to run in this process - if not, return noop //
////////////////////////////////////////////////////////////////////////////////
boolean doFullValidation = runBackendStepInput.getValue_boolean(StreamedETLWithFrontendProcess.FIELD_DO_FULL_VALIDATION);
if(!doFullValidation)
{
LOG.info("Not requested to do full validation, so skipping validation step");
return;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
// if we're proceeding with full validation, move the review step to be after validation in the step list //
////////////////////////////////////////////////////////////////////////////////////////////////////////////
ArrayList<String> stepList = new ArrayList<>(runBackendStepOutput.getProcessState().getStepList());
System.out.println("Step list pre: " + stepList);
stepList.removeIf(s -> s.equals(StreamedETLWithFrontendProcess.STEP_NAME_REVIEW));
stepList.add(stepList.indexOf(StreamedETLWithFrontendProcess.STEP_NAME_VALIDATE) + 1, StreamedETLWithFrontendProcess.STEP_NAME_REVIEW);
runBackendStepOutput.getProcessState().setStepList(stepList);
System.out.println("Step list post: " + stepList);
//////////////////////////////////////////////////////////
// basically repeat the preview step, but with no limit //
//////////////////////////////////////////////////////////
RecordPipe recordPipe = new RecordPipe();
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
extractStep.setLimit(null);
extractStep.setRecordPipe(recordPipe);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
if(!(transformStep instanceof ProcessSummaryProviderInterface processSummaryProvider))
{
throw (new QException("Transform Step " + transformStep.getClass().getName() + " does not implement ProcessSummaryProviderInterface."));
}
List<QRecord> previewRecordList = new ArrayList<>();
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Preview>ValidateStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) ->
{
extractStep.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput);
},
() -> (consumeRecordsFromPipe(recordPipe, transformStep, runBackendStepInput, runBackendStepOutput, previewRecordList))
);
runBackendStepOutput.setRecords(previewRecordList);
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT, recordCount);
//////////////////////////////////////////////////////
// get the process summary from the validation step //
//////////////////////////////////////////////////////
runBackendStepOutput.addValue(StreamedETLWithFrontendProcess.FIELD_VALIDATION_SUMMARY, processSummaryProvider.getProcessSummary(false));
}
/*******************************************************************************
**
*******************************************************************************/
private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> previewRecordList) throws QException
{
Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT);
if(totalRows != null)
{
runBackendStepInput.getAsyncJobCallback().updateStatus(currentRowCount, totalRows);
}
///////////////////////////////////
// get the records from the pipe //
///////////////////////////////////
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
/////////////////////////////////////////////////////
// pass the records through the transform function //
/////////////////////////////////////////////////////
transformStep.setInputRecordPage(qRecords);
transformStep.run(runBackendStepInput, runBackendStepOutput);
///////////////////////////////////////////////////////
// copy a small number of records to the output list //
///////////////////////////////////////////////////////
int i = 0;
while(previewRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < transformStep.getOutputRecordPage().size())
{
previewRecordList.add(transformStep.getOutputRecordPage().get(i++));
}
currentRowCount += qRecords.size();
return (qRecords.size());
}
}

View File

@ -22,11 +22,17 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QComponentType;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFrontendComponentMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFrontendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFunctionInputMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFunctionOutputMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData;
@ -51,60 +57,104 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData;
*******************************************************************************/
public class StreamedETLWithFrontendProcess
{
public static final String PROCESS_NAME = "etl.streamedWithFrontend";
public static final String STEP_NAME_PREVIEW = "preview";
public static final String STEP_NAME_REVIEW = "review";
public static final String STEP_NAME_VALIDATE = "validate";
public static final String STEP_NAME_EXECUTE = "execute";
public static final String STEP_NAME_RESULT = "result";
public static final String STEP_NAME_PREVIEW = "preview";
public static final String STEP_NAME_REVIEW = "review";
public static final String STEP_NAME_EXECUTE = "execute";
public static final String STEP_NAME_RESULT = "result";
public static final String FIELD_EXTRACT_CODE = "extract"; // QCodeReference, of AbstractExtractStep
public static final String FIELD_TRANSFORM_CODE = "transform"; // QCodeReference, of AbstractTransformStep
public static final String FIELD_LOAD_CODE = "load"; // QCodeReference, of AbstractLoadStep
public static final String FIELD_EXTRACT_CODE = "extract";
public static final String FIELD_TRANSFORM_CODE = "transform";
public static final String FIELD_LOAD_CODE = "load";
public static final String FIELD_SOURCE_TABLE = "sourceTable"; // String
public static final String FIELD_DESTINATION_TABLE = "destinationTable"; // String
public static final String FIELD_RECORD_COUNT = "recordCount"; // Integer
public static final String FIELD_DEFAULT_QUERY_FILTER = "defaultQueryFilter"; // QQueryFilter or String (json, of q QQueryFilter)
public static final String FIELD_SOURCE_TABLE = "sourceTable";
public static final String FIELD_DEFAULT_QUERY_FILTER = "defaultQueryFilter";
public static final String FIELD_DESTINATION_TABLE = "destinationTable";
public static final String FIELD_SUPPORTS_FULL_VALIDATION = "supportsFullValidation"; // Boolean
public static final String FIELD_DO_FULL_VALIDATION = "doFullValidation"; // Boolean
public static final String FIELD_VALIDATION_SUMMARY = "validationSummary"; // List<ProcessSummaryLine>
public static final String FIELD_PROCESS_SUMMARY = "processResults"; // List<ProcessSummaryLine>
/*******************************************************************************
**
*******************************************************************************/
public QProcessMetaData defineProcessMetaData(
public static QProcessMetaData defineProcessMetaData(
String sourceTableName,
String destinationTableName,
Class<? extends AbstractExtractStep> extractStepClass,
Class<? extends AbstractTransformStep> transformStepClass,
Class<? extends AbstractLoadStep> loadStepClass
)
{
Map<String, Serializable> defaultFieldValues = new HashMap<>();
defaultFieldValues.put(FIELD_SOURCE_TABLE, sourceTableName);
defaultFieldValues.put(FIELD_DESTINATION_TABLE, destinationTableName);
return defineProcessMetaData(extractStepClass, transformStepClass, loadStepClass, defaultFieldValues);
}
/*******************************************************************************
** @param defaultFieldValues - expected to possibly contain values for the following field names:
** - FIELD_SOURCE_TABLE
** - FIELD_DESTINATION_TABLE
** - FIELD_SUPPORTS_FULL_VALIDATION
** - FIELD_DEFAULT_QUERY_FILTER
** - FIELD_DO_FULL_VALIDATION
*******************************************************************************/
public static QProcessMetaData defineProcessMetaData(
Class<? extends AbstractExtractStep> extractStepClass,
Class<? extends AbstractTransformStep> transformStepClass,
Class<? extends AbstractLoadStep> loadStepClass,
Map<String, Serializable> defaultFieldValues
)
{
QStepMetaData previewStep = new QBackendStepMetaData()
.withName(STEP_NAME_PREVIEW)
.withCode(new QCodeReference(StreamedETLPreviewStep.class))
.withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData().withName(FIELD_SOURCE_TABLE).withDefaultValue(sourceTableName))
.withField(new QFieldMetaData().withName(FIELD_DEFAULT_QUERY_FILTER))
.withField(new QFieldMetaData().withName(FIELD_SOURCE_TABLE).withDefaultValue(defaultFieldValues.get(FIELD_SOURCE_TABLE)))
.withField(new QFieldMetaData().withName(FIELD_SUPPORTS_FULL_VALIDATION).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_SUPPORTS_FULL_VALIDATION, false)))
.withField(new QFieldMetaData().withName(FIELD_DEFAULT_QUERY_FILTER).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER)))
.withField(new QFieldMetaData().withName(FIELD_EXTRACT_CODE).withDefaultValue(new QCodeReference(extractStepClass)))
.withField(new QFieldMetaData().withName(FIELD_TRANSFORM_CODE).withDefaultValue(new QCodeReference(transformStepClass))));
.withField(new QFieldMetaData().withName(FIELD_TRANSFORM_CODE).withDefaultValue(new QCodeReference(transformStepClass)))
);
QFrontendStepMetaData reviewStep = new QFrontendStepMetaData()
.withName(STEP_NAME_REVIEW);
.withName(STEP_NAME_REVIEW)
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.VALIDATION_REVIEW_SCREEN));
QStepMetaData validateStep = new QBackendStepMetaData()
.withName(STEP_NAME_VALIDATE)
.withCode(new QCodeReference(StreamedETLValidateStep.class))
.withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData().withName(FIELD_DO_FULL_VALIDATION).withDefaultValue(defaultFieldValues.get(FIELD_DO_FULL_VALIDATION))))
.withOutputMetaData(new QFunctionOutputMetaData()
.withField(new QFieldMetaData().withName(FIELD_VALIDATION_SUMMARY))
);
QStepMetaData executeStep = new QBackendStepMetaData()
.withName(STEP_NAME_EXECUTE)
.withCode(new QCodeReference(StreamedETLExecuteStep.class))
.withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData().withName(FIELD_DESTINATION_TABLE).withDefaultValue(destinationTableName))
.withField(new QFieldMetaData().withName(FIELD_LOAD_CODE).withDefaultValue(new QCodeReference(loadStepClass))));
.withField(new QFieldMetaData().withName(FIELD_DESTINATION_TABLE).withDefaultValue(defaultFieldValues.get(FIELD_DESTINATION_TABLE)))
.withField(new QFieldMetaData().withName(FIELD_LOAD_CODE).withDefaultValue(new QCodeReference(loadStepClass))))
.withOutputMetaData(new QFunctionOutputMetaData()
.withField(new QFieldMetaData().withName(FIELD_PROCESS_SUMMARY))
);
QFrontendStepMetaData resultStep = new QFrontendStepMetaData()
.withName(STEP_NAME_RESULT);
.withName(STEP_NAME_RESULT)
.withComponent(new QFrontendComponentMetaData().withType(QComponentType.PROCESS_SUMMARY_RESULTS));
return new QProcessMetaData()
.withName(PROCESS_NAME)
.addStep(previewStep)
.addStep(reviewStep)
.addStep(validateStep)
.addStep(executeStep)
.addStep(resultStep);
}