Compare commits

..

1 Commits

Author SHA1 Message Date
66f9d1b500 CE-889 - improvements for streaming etl pipes:
allow StreamedETLWithFrontendProcess pipe capacity to come from field 'recordPipeCapacity';  also use either field-based on transform-step-based pipe capacity in Validate step as well as Execute step;

in AsyncRecordPipeLoop, if pipe capacity is less than minRecordsToConsume, then set minRecordsToConsume down to pipe capacity.

change AbstractLoadStep and AbstractTransformStep for StreamedETLWithFrontendProcesses to no implement BackendStep, and as such to (eventually) require a runOnePage method, rather than run (run marked as @Deprecated until apps can migrate);
2024-03-03 16:45:28 -06:00
12 changed files with 126 additions and 96 deletions

View File

@ -83,6 +83,15 @@ public class AsyncRecordPipeLoop
long jobStartTime = System.currentTimeMillis(); long jobStartTime = System.currentTimeMillis();
boolean everCalledConsumer = false; boolean everCalledConsumer = false;
////////////////////////////////////////////////////////////////////////////
// in case the pipe capacity has been made very small (useful in tests!), //
// then make the minRecordsToConsume match it. //
////////////////////////////////////////////////////////////////////////////
if(recordPipe.getCapacity() < minRecordsToConsume)
{
minRecordsToConsume = recordPipe.getCapacity();
}
while(jobState.equals(AsyncJobState.RUNNING)) while(jobState.equals(AsyncJobState.RUNNING))
{ {
if(recordPipe.countAvailableRecords() < minRecordsToConsume) if(recordPipe.countAvailableRecords() < minRecordsToConsume)

View File

@ -273,7 +273,7 @@ public class GenerateReportAction
RunBackendStepOutput transformStepOutput = null; RunBackendStepOutput transformStepOutput = null;
if(tableView != null && tableView.getRecordTransformStep() != null) if(tableView != null && tableView.getRecordTransformStep() != null)
{ {
transformStep = QCodeLoader.getBackendStep(AbstractTransformStep.class, tableView.getRecordTransformStep()); transformStep = QCodeLoader.getAdHoc(AbstractTransformStep.class, tableView.getRecordTransformStep());
transformStepInput = new RunBackendStepInput(); transformStepInput = new RunBackendStepInput();
transformStepInput.setValues(reportInput.getInputValues()); transformStepInput.setValues(reportInput.getInputValues());

View File

@ -44,7 +44,8 @@ public class RecordPipe
private static final long BLOCKING_SLEEP_MILLIS = 100; private static final long BLOCKING_SLEEP_MILLIS = 100;
private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(1_000); private int capacity = 1_000;
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(capacity);
private boolean isTerminated = false; private boolean isTerminated = false;
@ -72,6 +73,7 @@ public class RecordPipe
*******************************************************************************/ *******************************************************************************/
public RecordPipe(Integer overrideCapacity) public RecordPipe(Integer overrideCapacity)
{ {
this.capacity = overrideCapacity;
queue = new ArrayBlockingQueue<>(overrideCapacity); queue = new ArrayBlockingQueue<>(overrideCapacity);
} }
@ -213,4 +215,14 @@ public class RecordPipe
this.postRecordActions = postRecordActions; this.postRecordActions = postRecordActions;
} }
/*******************************************************************************
** Getter for capacity
**
*******************************************************************************/
public int getCapacity()
{
return capacity;
}
} }

View File

@ -136,8 +136,6 @@ public class ReplaceAction extends AbstractQActionFunction<ReplaceInput, Replace
UpdateOutput updateOutput = new UpdateAction().execute(updateInput); UpdateOutput updateOutput = new UpdateAction().execute(updateInput);
output.setUpdateOutput(updateOutput); output.setUpdateOutput(updateOutput);
if(input.getPerformDeletes())
{
QQueryFilter deleteFilter = new QQueryFilter(new QFilterCriteria(primaryKeyField, QCriteriaOperator.NOT_IN, primaryKeysToKeep)); QQueryFilter deleteFilter = new QQueryFilter(new QFilterCriteria(primaryKeyField, QCriteriaOperator.NOT_IN, primaryKeysToKeep));
if(input.getFilter() != null) if(input.getFilter() != null)
{ {
@ -151,7 +149,6 @@ public class ReplaceAction extends AbstractQActionFunction<ReplaceInput, Replace
deleteInput.setOmitDmlAudit(input.getOmitDmlAudit()); deleteInput.setOmitDmlAudit(input.getOmitDmlAudit());
DeleteOutput deleteOutput = new DeleteAction().execute(deleteInput); DeleteOutput deleteOutput = new DeleteAction().execute(deleteInput);
output.setDeleteOutput(deleteOutput); output.setDeleteOutput(deleteOutput);
}
if(weOwnTheTransaction) if(weOwnTheTransaction)
{ {

View File

@ -39,7 +39,6 @@ public class ReplaceInput extends AbstractTableActionInput
private UniqueKey key; private UniqueKey key;
private List<QRecord> records; private List<QRecord> records;
private QQueryFilter filter; private QQueryFilter filter;
private boolean performDeletes = true;
private boolean omitDmlAudit = false; private boolean omitDmlAudit = false;
@ -208,35 +207,4 @@ public class ReplaceInput extends AbstractTableActionInput
return (this); return (this);
} }
/*******************************************************************************
** Getter for performDeletes
*******************************************************************************/
public boolean getPerformDeletes()
{
return (this.performDeletes);
}
/*******************************************************************************
** Setter for performDeletes
*******************************************************************************/
public void setPerformDeletes(boolean performDeletes)
{
this.performDeletes = performDeletes;
}
/*******************************************************************************
** Fluent setter for performDeletes
*******************************************************************************/
public ReplaceInput withPerformDeletes(boolean performDeletes)
{
this.performDeletes = performDeletes;
return (this);
}
} }

View File

@ -34,7 +34,6 @@ import java.util.Map;
public abstract class QWidgetData public abstract class QWidgetData
{ {
private String label; private String label;
private String sublabel;
private String footerHTML; private String footerHTML;
private List<String> dropdownNameList; private List<String> dropdownNameList;
private List<String> dropdownLabelList; private List<String> dropdownLabelList;
@ -52,7 +51,6 @@ public abstract class QWidgetData
private List<List<Serializable>> csvData; private List<List<Serializable>> csvData;
/******************************************************************************* /*******************************************************************************
** Getter for type ** Getter for type
*******************************************************************************/ *******************************************************************************/
@ -358,35 +356,4 @@ public abstract class QWidgetData
return (this); return (this);
} }
/*******************************************************************************
** Getter for sublabel
*******************************************************************************/
public String getSublabel()
{
return (this.sublabel);
}
/*******************************************************************************
** Setter for sublabel
*******************************************************************************/
public void setSublabel(String sublabel)
{
this.sublabel = sublabel;
}
/*******************************************************************************
** Fluent setter for sublabel
*******************************************************************************/
public QWidgetData withSublabel(String sublabel)
{
this.sublabel = sublabel;
return (this);
}
} }

View File

@ -24,7 +24,6 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
import java.util.Optional; import java.util.Optional;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
@ -38,11 +37,11 @@ import com.kingsrook.qqq.backend.core.model.session.QSession;
** should be written to the output object's Records, noting that when running ** should be written to the output object's Records, noting that when running
** as a streamed-ETL process, those input & output objects will be instances of ** as a streamed-ETL process, those input & output objects will be instances of
** the StreamedBackendStep{Input,Output} classes, that will be associated with ** the StreamedBackendStep{Input,Output} classes, that will be associated with
** a page of records flowing thorugh a pipe. ** a page of records flowing through a pipe.
** **
** Also - use the transaction member variable!!! ** Also - use the transaction member variable!!!
*******************************************************************************/ *******************************************************************************/
public abstract class AbstractLoadStep implements BackendStep public abstract class AbstractLoadStep
{ {
private Optional<QBackendTransaction> transaction = Optional.empty(); private Optional<QBackendTransaction> transaction = Optional.empty();
protected QSession session; protected QSession session;
@ -51,6 +50,25 @@ public abstract class AbstractLoadStep implements BackendStep
/*******************************************************************************
**
*******************************************************************************/
@Deprecated
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
runOnePage(runBackendStepInput, runBackendStepOutput);
}
/*******************************************************************************
** todo - make abstract when run is deleted.
*******************************************************************************/
public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
}
/******************************************************************************* /*******************************************************************************
** Allow subclasses to do an action before the run is complete - before any ** Allow subclasses to do an action before the run is complete - before any
** pages of records are passed in. ** pages of records are passed in.

View File

@ -24,7 +24,6 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
import java.util.Optional; import java.util.Optional;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
@ -40,12 +39,33 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutp
** a page of records flowing through a pipe. ** a page of records flowing through a pipe.
** **
*******************************************************************************/ *******************************************************************************/
public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface public abstract class AbstractTransformStep implements ProcessSummaryProviderInterface
{ {
private Optional<QBackendTransaction> transaction = Optional.empty(); private Optional<QBackendTransaction> transaction = Optional.empty();
/*******************************************************************************
**
*******************************************************************************/
@Deprecated
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
runOnePage(runBackendStepInput, runBackendStepOutput);
}
/*******************************************************************************
** todo - make abstract when run is deleted.
*******************************************************************************/
public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
}
/******************************************************************************* /*******************************************************************************
** Allow subclasses to do an action before the run is complete - before any ** Allow subclasses to do an action before the run is complete - before any
** pages of records are passed in. ** pages of records are passed in.

View File

@ -63,7 +63,7 @@ public class BaseStreamedETLStep
protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput) protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput)
{ {
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE); QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE);
return (QCodeLoader.getBackendStep(AbstractTransformStep.class, codeReference)); return (QCodeLoader.getAdHoc(AbstractTransformStep.class, codeReference));
} }
@ -74,7 +74,7 @@ public class BaseStreamedETLStep
protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput) protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput)
{ {
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE); QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE);
return (QCodeLoader.getBackendStep(AbstractLoadStep.class, codeReference)); return (QCodeLoader.getAdHoc(AbstractLoadStep.class, codeReference));
} }

View File

@ -83,7 +83,15 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
// before it can put more records in. // // before it can put more records in. //
///////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////
RecordPipe recordPipe; RecordPipe recordPipe;
Integer overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput); Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity");
if(overrideRecordPipeCapacity != null)
{
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
}
else
{
overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput);
if(overrideRecordPipeCapacity != null) if(overrideRecordPipeCapacity != null)
{ {
recordPipe = new RecordPipe(overrideRecordPipeCapacity); recordPipe = new RecordPipe(overrideRecordPipeCapacity);
@ -102,6 +110,7 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
recordPipe = new RecordPipe(); recordPipe = new RecordPipe();
} }
} }
}
extractStep.setRecordPipe(recordPipe); extractStep.setRecordPipe(recordPipe);
extractStep.preRun(runBackendStepInput, runBackendStepOutput); extractStep.preRun(runBackendStepInput, runBackendStepOutput);

View File

@ -81,13 +81,41 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
// basically repeat the preview step, but with no limit // // basically repeat the preview step, but with no limit //
////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////
runBackendStepInput.getAsyncJobCallback().updateStatus("Validating Records"); runBackendStepInput.getAsyncJobCallback().updateStatus("Validating Records");
RecordPipe recordPipe = new RecordPipe();
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput); AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
//////////////////////////////////////////////////////////////////////
// let the transform step override the capacity for the record pipe //
//////////////////////////////////////////////////////////////////////
RecordPipe recordPipe;
Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity");
if(overrideRecordPipeCapacity != null)
{
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
}
else
{
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
if(overrideRecordPipeCapacity != null)
{
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
}
else
{
recordPipe = new RecordPipe();
}
}
/////////////////////////////
// set up the extract step //
/////////////////////////////
extractStep.setLimit(null); extractStep.setLimit(null);
extractStep.setRecordPipe(recordPipe); extractStep.setRecordPipe(recordPipe);
extractStep.preRun(runBackendStepInput, runBackendStepOutput); extractStep.preRun(runBackendStepInput, runBackendStepOutput);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
transformStep.preRun(runBackendStepInput, runBackendStepOutput); transformStep.preRun(runBackendStepInput, runBackendStepOutput);
List<QRecord> previewRecordList = new ArrayList<>(); List<QRecord> previewRecordList = new ArrayList<>();

View File

@ -151,6 +151,7 @@ public class StreamedETLWithFrontendProcess
.withField(new QFieldMetaData(FIELD_DEFAULT_QUERY_FILTER, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER))) .withField(new QFieldMetaData(FIELD_DEFAULT_QUERY_FILTER, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER)))
.withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass))) .withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass)))
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass))) .withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass)))
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractTransformStep.class.getName()))
.withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT))) .withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT)))
.withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS))) .withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS)))
); );
@ -170,7 +171,8 @@ public class StreamedETLWithFrontendProcess
.withName(STEP_NAME_EXECUTE) .withName(STEP_NAME_EXECUTE)
.withCode(new QCodeReference(StreamedETLExecuteStep.class)) .withCode(new QCodeReference(StreamedETLExecuteStep.class))
.withInputData(new QFunctionInputMetaData() .withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass)))) .withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass)))
.withField(new QFieldMetaData(FIELD_LOAD_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractLoadStep.class.getName())))
.withOutputMetaData(new QFunctionOutputMetaData() .withOutputMetaData(new QFunctionOutputMetaData()
.withField(new QFieldMetaData(FIELD_PROCESS_SUMMARY, QFieldType.STRING)) .withField(new QFieldMetaData(FIELD_PROCESS_SUMMARY, QFieldType.STRING))
); );