mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
CE-889 - improvements for streaming etl pipes:
allow StreamedETLWithFrontendProcess pipe capacity to come from field 'recordPipeCapacity'; also use either field-based on transform-step-based pipe capacity in Validate step as well as Execute step; in AsyncRecordPipeLoop, if pipe capacity is less than minRecordsToConsume, then set minRecordsToConsume down to pipe capacity. change AbstractLoadStep and AbstractTransformStep for StreamedETLWithFrontendProcesses to no implement BackendStep, and as such to (eventually) require a runOnePage method, rather than run (run marked as @Deprecated until apps can migrate);
This commit is contained in:
@ -83,6 +83,15 @@ public class AsyncRecordPipeLoop
|
|||||||
long jobStartTime = System.currentTimeMillis();
|
long jobStartTime = System.currentTimeMillis();
|
||||||
boolean everCalledConsumer = false;
|
boolean everCalledConsumer = false;
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
// in case the pipe capacity has been made very small (useful in tests!), //
|
||||||
|
// then make the minRecordsToConsume match it. //
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
if(recordPipe.getCapacity() < minRecordsToConsume)
|
||||||
|
{
|
||||||
|
minRecordsToConsume = recordPipe.getCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
while(jobState.equals(AsyncJobState.RUNNING))
|
while(jobState.equals(AsyncJobState.RUNNING))
|
||||||
{
|
{
|
||||||
if(recordPipe.countAvailableRecords() < minRecordsToConsume)
|
if(recordPipe.countAvailableRecords() < minRecordsToConsume)
|
||||||
|
@ -273,7 +273,7 @@ public class GenerateReportAction
|
|||||||
RunBackendStepOutput transformStepOutput = null;
|
RunBackendStepOutput transformStepOutput = null;
|
||||||
if(tableView != null && tableView.getRecordTransformStep() != null)
|
if(tableView != null && tableView.getRecordTransformStep() != null)
|
||||||
{
|
{
|
||||||
transformStep = QCodeLoader.getBackendStep(AbstractTransformStep.class, tableView.getRecordTransformStep());
|
transformStep = QCodeLoader.getAdHoc(AbstractTransformStep.class, tableView.getRecordTransformStep());
|
||||||
|
|
||||||
transformStepInput = new RunBackendStepInput();
|
transformStepInput = new RunBackendStepInput();
|
||||||
transformStepInput.setValues(reportInput.getInputValues());
|
transformStepInput.setValues(reportInput.getInputValues());
|
||||||
|
@ -44,7 +44,8 @@ public class RecordPipe
|
|||||||
private static final long BLOCKING_SLEEP_MILLIS = 100;
|
private static final long BLOCKING_SLEEP_MILLIS = 100;
|
||||||
private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes
|
private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes
|
||||||
|
|
||||||
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(1_000);
|
private int capacity = 1_000;
|
||||||
|
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(capacity);
|
||||||
|
|
||||||
private boolean isTerminated = false;
|
private boolean isTerminated = false;
|
||||||
|
|
||||||
@ -72,6 +73,7 @@ public class RecordPipe
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public RecordPipe(Integer overrideCapacity)
|
public RecordPipe(Integer overrideCapacity)
|
||||||
{
|
{
|
||||||
|
this.capacity = overrideCapacity;
|
||||||
queue = new ArrayBlockingQueue<>(overrideCapacity);
|
queue = new ArrayBlockingQueue<>(overrideCapacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -213,4 +215,14 @@ public class RecordPipe
|
|||||||
this.postRecordActions = postRecordActions;
|
this.postRecordActions = postRecordActions;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for capacity
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public int getCapacity()
|
||||||
|
{
|
||||||
|
return capacity;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,7 +24,6 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
|
|||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||||
@ -38,11 +37,11 @@ import com.kingsrook.qqq.backend.core.model.session.QSession;
|
|||||||
** should be written to the output object's Records, noting that when running
|
** should be written to the output object's Records, noting that when running
|
||||||
** as a streamed-ETL process, those input & output objects will be instances of
|
** as a streamed-ETL process, those input & output objects will be instances of
|
||||||
** the StreamedBackendStep{Input,Output} classes, that will be associated with
|
** the StreamedBackendStep{Input,Output} classes, that will be associated with
|
||||||
** a page of records flowing thorugh a pipe.
|
** a page of records flowing through a pipe.
|
||||||
**
|
**
|
||||||
** Also - use the transaction member variable!!!
|
** Also - use the transaction member variable!!!
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public abstract class AbstractLoadStep implements BackendStep
|
public abstract class AbstractLoadStep
|
||||||
{
|
{
|
||||||
private Optional<QBackendTransaction> transaction = Optional.empty();
|
private Optional<QBackendTransaction> transaction = Optional.empty();
|
||||||
protected QSession session;
|
protected QSession session;
|
||||||
@ -51,6 +50,25 @@ public abstract class AbstractLoadStep implements BackendStep
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Deprecated
|
||||||
|
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
|
{
|
||||||
|
runOnePage(runBackendStepInput, runBackendStepOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** todo - make abstract when run is deleted.
|
||||||
|
*******************************************************************************/
|
||||||
|
public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Allow subclasses to do an action before the run is complete - before any
|
** Allow subclasses to do an action before the run is complete - before any
|
||||||
** pages of records are passed in.
|
** pages of records are passed in.
|
||||||
|
@ -24,7 +24,6 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
|
|||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||||
@ -40,12 +39,33 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutp
|
|||||||
** a page of records flowing through a pipe.
|
** a page of records flowing through a pipe.
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface
|
public abstract class AbstractTransformStep implements ProcessSummaryProviderInterface
|
||||||
{
|
{
|
||||||
private Optional<QBackendTransaction> transaction = Optional.empty();
|
private Optional<QBackendTransaction> transaction = Optional.empty();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Deprecated
|
||||||
|
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
|
{
|
||||||
|
runOnePage(runBackendStepInput, runBackendStepOutput);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** todo - make abstract when run is deleted.
|
||||||
|
*******************************************************************************/
|
||||||
|
public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Allow subclasses to do an action before the run is complete - before any
|
** Allow subclasses to do an action before the run is complete - before any
|
||||||
** pages of records are passed in.
|
** pages of records are passed in.
|
||||||
|
@ -63,7 +63,7 @@ public class BaseStreamedETLStep
|
|||||||
protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput)
|
protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput)
|
||||||
{
|
{
|
||||||
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE);
|
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE);
|
||||||
return (QCodeLoader.getBackendStep(AbstractTransformStep.class, codeReference));
|
return (QCodeLoader.getAdHoc(AbstractTransformStep.class, codeReference));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -74,7 +74,7 @@ public class BaseStreamedETLStep
|
|||||||
protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput)
|
protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput)
|
||||||
{
|
{
|
||||||
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE);
|
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE);
|
||||||
return (QCodeLoader.getBackendStep(AbstractLoadStep.class, codeReference));
|
return (QCodeLoader.getAdHoc(AbstractLoadStep.class, codeReference));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -83,23 +83,32 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
|
|||||||
// before it can put more records in. //
|
// before it can put more records in. //
|
||||||
/////////////////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
RecordPipe recordPipe;
|
RecordPipe recordPipe;
|
||||||
Integer overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity");
|
||||||
if(overrideRecordPipeCapacity != null)
|
if(overrideRecordPipeCapacity != null)
|
||||||
{
|
{
|
||||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||||
LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||||
if(overrideRecordPipeCapacity != null)
|
if(overrideRecordPipeCapacity != null)
|
||||||
{
|
{
|
||||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||||
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
recordPipe = new RecordPipe();
|
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||||
|
if(overrideRecordPipeCapacity != null)
|
||||||
|
{
|
||||||
|
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||||
|
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
recordPipe = new RecordPipe();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -81,13 +81,41 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
|
|||||||
// basically repeat the preview step, but with no limit //
|
// basically repeat the preview step, but with no limit //
|
||||||
//////////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////////
|
||||||
runBackendStepInput.getAsyncJobCallback().updateStatus("Validating Records");
|
runBackendStepInput.getAsyncJobCallback().updateStatus("Validating Records");
|
||||||
RecordPipe recordPipe = new RecordPipe();
|
|
||||||
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
|
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
|
||||||
|
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
// let the transform step override the capacity for the record pipe //
|
||||||
|
//////////////////////////////////////////////////////////////////////
|
||||||
|
RecordPipe recordPipe;
|
||||||
|
Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity");
|
||||||
|
if(overrideRecordPipeCapacity != null)
|
||||||
|
{
|
||||||
|
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||||
|
LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||||
|
if(overrideRecordPipeCapacity != null)
|
||||||
|
{
|
||||||
|
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||||
|
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
recordPipe = new RecordPipe();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/////////////////////////////
|
||||||
|
// set up the extract step //
|
||||||
|
/////////////////////////////
|
||||||
extractStep.setLimit(null);
|
extractStep.setLimit(null);
|
||||||
extractStep.setRecordPipe(recordPipe);
|
extractStep.setRecordPipe(recordPipe);
|
||||||
extractStep.preRun(runBackendStepInput, runBackendStepOutput);
|
extractStep.preRun(runBackendStepInput, runBackendStepOutput);
|
||||||
|
|
||||||
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
|
|
||||||
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
|
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
|
||||||
|
|
||||||
List<QRecord> previewRecordList = new ArrayList<>();
|
List<QRecord> previewRecordList = new ArrayList<>();
|
||||||
|
@ -151,6 +151,7 @@ public class StreamedETLWithFrontendProcess
|
|||||||
.withField(new QFieldMetaData(FIELD_DEFAULT_QUERY_FILTER, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER)))
|
.withField(new QFieldMetaData(FIELD_DEFAULT_QUERY_FILTER, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER)))
|
||||||
.withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass)))
|
.withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass)))
|
||||||
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass)))
|
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass)))
|
||||||
|
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractTransformStep.class.getName()))
|
||||||
.withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT)))
|
.withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT)))
|
||||||
.withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS)))
|
.withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS)))
|
||||||
);
|
);
|
||||||
@ -170,7 +171,8 @@ public class StreamedETLWithFrontendProcess
|
|||||||
.withName(STEP_NAME_EXECUTE)
|
.withName(STEP_NAME_EXECUTE)
|
||||||
.withCode(new QCodeReference(StreamedETLExecuteStep.class))
|
.withCode(new QCodeReference(StreamedETLExecuteStep.class))
|
||||||
.withInputData(new QFunctionInputMetaData()
|
.withInputData(new QFunctionInputMetaData()
|
||||||
.withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass))))
|
.withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass)))
|
||||||
|
.withField(new QFieldMetaData(FIELD_LOAD_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractLoadStep.class.getName())))
|
||||||
.withOutputMetaData(new QFunctionOutputMetaData()
|
.withOutputMetaData(new QFunctionOutputMetaData()
|
||||||
.withField(new QFieldMetaData(FIELD_PROCESS_SUMMARY, QFieldType.STRING))
|
.withField(new QFieldMetaData(FIELD_PROCESS_SUMMARY, QFieldType.STRING))
|
||||||
);
|
);
|
||||||
|
Reference in New Issue
Block a user