From da116349ff5e0e165ee380f67ce9f88b35d7cf79 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Thu, 27 Oct 2022 11:02:47 -0500 Subject: [PATCH] Update to only update basepull timestamp after execution; make sure preRun runs before count --- .../actions/processes/RunProcessAction.java | 18 ++++++++----- .../StreamedETLExecuteStep.java | 5 ++++ .../StreamedETLPreviewStep.java | 27 +++++++++---------- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/RunProcessAction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/RunProcessAction.java index e5ef9755..34e33336 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/RunProcessAction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/RunProcessAction.java @@ -59,6 +59,7 @@ import com.kingsrook.qqq.backend.core.state.StateType; import com.kingsrook.qqq.backend.core.state.UUIDAndTypeStateKey; import com.kingsrook.qqq.backend.core.utils.CollectionUtils; import com.kingsrook.qqq.backend.core.utils.ValueUtils; +import org.apache.commons.lang.BooleanUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -75,6 +76,11 @@ public class RunProcessAction public static final String BASEPULL_LAST_RUNTIME_KEY = "basepullLastRuntimeKey"; public static final String BASEPULL_TIMESTAMP_FIELD = "basepullTimestampField"; + //////////////////////////////////////////////////////////////////////////////////////////////// + // indicator that the timestamp field should be updated - e.g., the execute step is finished. // + //////////////////////////////////////////////////////////////////////////////////////////////// + public static final String BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD = "basepullReadyToUpdateTimestamp"; + /******************************************************************************* @@ -184,14 +190,12 @@ public class RunProcessAction } } - /////////////////////////////////////////////////////////////////////////////// - // if 'basepull' style process, store the time stored before process was ran // - /////////////////////////////////////////////////////////////////////////////// - if(basepullConfiguration != null) + //////////////////////////////////////////////////////////////////////////////////// + // if 'basepull' style process, update the stored basepull timestamp // + // but only when we've been signaled to do so - i.e., after an Execute step runs. // + //////////////////////////////////////////////////////////////////////////////////// + if(basepullConfiguration != null && BooleanUtils.isTrue(ValueUtils.getValueAsBoolean(runProcessInput.getValue(BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD)))) { - /////////////////////////////////////// - // get the stored basepull timestamp // - /////////////////////////////////////// storeLastRunTime(runProcessInput, process, basepullConfiguration); } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java index bcf93dc3..84dae779 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java @@ -97,6 +97,11 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe transformStep.postRun(runBackendStepInput, runBackendStepOutput); loadStep.postRun(runBackendStepInput, runBackendStepOutput); + ////////////////////////////////////////////////////////////////////////////// + // set the flag to state that the basepull timestamp should be updated now. // + ////////////////////////////////////////////////////////////////////////////// + runBackendStepOutput.addValue(RunProcessAction.BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD, true); + ///////////////////// // commit the work // ///////////////////// diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java index aa587be9..7c3c782e 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java @@ -82,13 +82,21 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe return; } - /////////////////////////////////////////// - // request a count from the extract step // - /////////////////////////////////////////// + ////////////////////////////////////////// + // set up the extract & transform steps // + ////////////////////////////////////////// AbstractExtractStep extractStep = getExtractStep(runBackendStepInput); - Integer recordCount = extractStep.doCount(runBackendStepInput); + RecordPipe recordPipe = new RecordPipe(); + extractStep.setLimit(limit); + extractStep.setRecordPipe(recordPipe); + extractStep.preRun(runBackendStepInput, runBackendStepOutput); + + Integer recordCount = extractStep.doCount(runBackendStepInput); runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount); + AbstractTransformStep transformStep = getTransformStep(runBackendStepInput); + transformStep.preRun(runBackendStepInput, runBackendStepOutput); + //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // if the count is less than the normal limit here, and this process supports validation, then go straight to the validation step // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// @@ -100,17 +108,6 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe // return; // } - //////////////////////////////////////////////////////// - // proceed with a doing a limited extract & transform // - //////////////////////////////////////////////////////// - RecordPipe recordPipe = new RecordPipe(); - extractStep.setLimit(limit); - extractStep.setRecordPipe(recordPipe); - extractStep.preRun(runBackendStepInput, runBackendStepOutput); - - AbstractTransformStep transformStep = getTransformStep(runBackendStepInput); - transformStep.preRun(runBackendStepInput, runBackendStepOutput); - List previewRecordList = new ArrayList<>(); new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) -> {