From 66f9d1b500825655697d3c122131b6ded4efb1ed Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Sun, 3 Mar 2024 16:45:28 -0600 Subject: [PATCH] CE-889 - improvements for streaming etl pipes: allow StreamedETLWithFrontendProcess pipe capacity to come from field 'recordPipeCapacity'; also use either field-based on transform-step-based pipe capacity in Validate step as well as Execute step; in AsyncRecordPipeLoop, if pipe capacity is less than minRecordsToConsume, then set minRecordsToConsume down to pipe capacity. change AbstractLoadStep and AbstractTransformStep for StreamedETLWithFrontendProcesses to no implement BackendStep, and as such to (eventually) require a runOnePage method, rather than run (run marked as @Deprecated until apps can migrate); --- .../actions/async/AsyncRecordPipeLoop.java | 9 ++++++ .../reporting/GenerateReportAction.java | 2 +- .../core/actions/reporting/RecordPipe.java | 14 +++++++- .../AbstractLoadStep.java | 24 ++++++++++++-- .../AbstractTransformStep.java | 24 ++++++++++++-- .../BaseStreamedETLStep.java | 4 +-- .../StreamedETLExecuteStep.java | 19 ++++++++--- .../StreamedETLValidateStep.java | 32 +++++++++++++++++-- .../StreamedETLWithFrontendProcess.java | 4 ++- 9 files changed, 115 insertions(+), 17 deletions(-) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java index 22d73fb5..74430f7f 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java @@ -83,6 +83,15 @@ public class AsyncRecordPipeLoop long jobStartTime = System.currentTimeMillis(); boolean everCalledConsumer = false; + //////////////////////////////////////////////////////////////////////////// + // in case the pipe capacity has been made very small (useful in tests!), // + // then make the minRecordsToConsume match it. // + //////////////////////////////////////////////////////////////////////////// + if(recordPipe.getCapacity() < minRecordsToConsume) + { + minRecordsToConsume = recordPipe.getCapacity(); + } + while(jobState.equals(AsyncJobState.RUNNING)) { if(recordPipe.countAvailableRecords() < minRecordsToConsume) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/GenerateReportAction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/GenerateReportAction.java index 5de10c00..93b9d183 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/GenerateReportAction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/GenerateReportAction.java @@ -273,7 +273,7 @@ public class GenerateReportAction RunBackendStepOutput transformStepOutput = null; if(tableView != null && tableView.getRecordTransformStep() != null) { - transformStep = QCodeLoader.getBackendStep(AbstractTransformStep.class, tableView.getRecordTransformStep()); + transformStep = QCodeLoader.getAdHoc(AbstractTransformStep.class, tableView.getRecordTransformStep()); transformStepInput = new RunBackendStepInput(); transformStepInput.setValues(reportInput.getInputValues()); diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java index dff2c4de..2583855f 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java @@ -44,7 +44,8 @@ public class RecordPipe private static final long BLOCKING_SLEEP_MILLIS = 100; private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes - private ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1_000); + private int capacity = 1_000; + private ArrayBlockingQueue queue = new ArrayBlockingQueue<>(capacity); private boolean isTerminated = false; @@ -72,6 +73,7 @@ public class RecordPipe *******************************************************************************/ public RecordPipe(Integer overrideCapacity) { + this.capacity = overrideCapacity; queue = new ArrayBlockingQueue<>(overrideCapacity); } @@ -213,4 +215,14 @@ public class RecordPipe this.postRecordActions = postRecordActions; } + + + /******************************************************************************* + ** Getter for capacity + ** + *******************************************************************************/ + public int getCapacity() + { + return capacity; + } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java index 835e79d7..483d39d1 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java @@ -24,7 +24,6 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit import java.util.Optional; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; -import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; @@ -38,11 +37,11 @@ import com.kingsrook.qqq.backend.core.model.session.QSession; ** should be written to the output object's Records, noting that when running ** as a streamed-ETL process, those input & output objects will be instances of ** the StreamedBackendStep{Input,Output} classes, that will be associated with - ** a page of records flowing thorugh a pipe. + ** a page of records flowing through a pipe. ** ** Also - use the transaction member variable!!! *******************************************************************************/ -public abstract class AbstractLoadStep implements BackendStep +public abstract class AbstractLoadStep { private Optional transaction = Optional.empty(); protected QSession session; @@ -51,6 +50,25 @@ public abstract class AbstractLoadStep implements BackendStep + /******************************************************************************* + ** + *******************************************************************************/ + @Deprecated + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + runOnePage(runBackendStepInput, runBackendStepOutput); + } + + + /******************************************************************************* + ** todo - make abstract when run is deleted. + *******************************************************************************/ + public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + + } + + /******************************************************************************* ** Allow subclasses to do an action before the run is complete - before any ** pages of records are passed in. diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java index 187ae972..f81cf726 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java @@ -24,7 +24,6 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit import java.util.Optional; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; -import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; @@ -40,12 +39,33 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutp ** a page of records flowing through a pipe. ** *******************************************************************************/ -public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface +public abstract class AbstractTransformStep implements ProcessSummaryProviderInterface { private Optional transaction = Optional.empty(); + /******************************************************************************* + ** + *******************************************************************************/ + @Deprecated + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + runOnePage(runBackendStepInput, runBackendStepOutput); + } + + + + /******************************************************************************* + ** todo - make abstract when run is deleted. + *******************************************************************************/ + public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + + } + + + /******************************************************************************* ** Allow subclasses to do an action before the run is complete - before any ** pages of records are passed in. diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java index 74cab0b6..172ec4cf 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java @@ -63,7 +63,7 @@ public class BaseStreamedETLStep protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput) { QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE); - return (QCodeLoader.getBackendStep(AbstractTransformStep.class, codeReference)); + return (QCodeLoader.getAdHoc(AbstractTransformStep.class, codeReference)); } @@ -74,7 +74,7 @@ public class BaseStreamedETLStep protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput) { QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE); - return (QCodeLoader.getBackendStep(AbstractLoadStep.class, codeReference)); + return (QCodeLoader.getAdHoc(AbstractLoadStep.class, codeReference)); } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java index 52280d45..3acfe8d4 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java @@ -83,23 +83,32 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe // before it can put more records in. // ///////////////////////////////////////////////////////////////////////////// RecordPipe recordPipe; - Integer overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput); + Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity"); if(overrideRecordPipeCapacity != null) { recordPipe = new RecordPipe(overrideRecordPipeCapacity); - LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); + LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); } else { - overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput); + overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput); if(overrideRecordPipeCapacity != null) { recordPipe = new RecordPipe(overrideRecordPipeCapacity); - LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); + LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); } else { - recordPipe = new RecordPipe(); + overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput); + if(overrideRecordPipeCapacity != null) + { + recordPipe = new RecordPipe(overrideRecordPipeCapacity); + LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); + } + else + { + recordPipe = new RecordPipe(); + } } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLValidateStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLValidateStep.java index 63d858c1..ed177c52 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLValidateStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLValidateStep.java @@ -81,13 +81,41 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back // basically repeat the preview step, but with no limit // ////////////////////////////////////////////////////////// runBackendStepInput.getAsyncJobCallback().updateStatus("Validating Records"); - RecordPipe recordPipe = new RecordPipe(); + AbstractExtractStep extractStep = getExtractStep(runBackendStepInput); + AbstractTransformStep transformStep = getTransformStep(runBackendStepInput); + + ////////////////////////////////////////////////////////////////////// + // let the transform step override the capacity for the record pipe // + ////////////////////////////////////////////////////////////////////// + RecordPipe recordPipe; + Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity"); + if(overrideRecordPipeCapacity != null) + { + recordPipe = new RecordPipe(overrideRecordPipeCapacity); + LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); + } + else + { + overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput); + if(overrideRecordPipeCapacity != null) + { + recordPipe = new RecordPipe(overrideRecordPipeCapacity); + LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); + } + else + { + recordPipe = new RecordPipe(); + } + } + + ///////////////////////////// + // set up the extract step // + ///////////////////////////// extractStep.setLimit(null); extractStep.setRecordPipe(recordPipe); extractStep.preRun(runBackendStepInput, runBackendStepOutput); - AbstractTransformStep transformStep = getTransformStep(runBackendStepInput); transformStep.preRun(runBackendStepInput, runBackendStepOutput); List previewRecordList = new ArrayList<>(); diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java index 06b19ff4..9c279017 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java @@ -151,6 +151,7 @@ public class StreamedETLWithFrontendProcess .withField(new QFieldMetaData(FIELD_DEFAULT_QUERY_FILTER, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER))) .withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass))) .withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass))) + .withField(new QFieldMetaData(FIELD_TRANSFORM_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractTransformStep.class.getName())) .withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT))) .withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS))) ); @@ -170,7 +171,8 @@ public class StreamedETLWithFrontendProcess .withName(STEP_NAME_EXECUTE) .withCode(new QCodeReference(StreamedETLExecuteStep.class)) .withInputData(new QFunctionInputMetaData() - .withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass)))) + .withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass))) + .withField(new QFieldMetaData(FIELD_LOAD_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractLoadStep.class.getName()))) .withOutputMetaData(new QFunctionOutputMetaData() .withField(new QFieldMetaData(FIELD_PROCESS_SUMMARY, QFieldType.STRING)) );