Update to only update basepull timestamp after execution; make sure preRun runs before count

This commit is contained in:
2022-10-27 11:02:47 -05:00
parent 5e7b6f40df
commit da116349ff
3 changed files with 28 additions and 22 deletions

View File

@ -59,6 +59,7 @@ import com.kingsrook.qqq.backend.core.state.StateType;
import com.kingsrook.qqq.backend.core.state.UUIDAndTypeStateKey; import com.kingsrook.qqq.backend.core.state.UUIDAndTypeStateKey;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils; import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import com.kingsrook.qqq.backend.core.utils.ValueUtils; import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import org.apache.commons.lang.BooleanUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -75,6 +76,11 @@ public class RunProcessAction
public static final String BASEPULL_LAST_RUNTIME_KEY = "basepullLastRuntimeKey"; public static final String BASEPULL_LAST_RUNTIME_KEY = "basepullLastRuntimeKey";
public static final String BASEPULL_TIMESTAMP_FIELD = "basepullTimestampField"; public static final String BASEPULL_TIMESTAMP_FIELD = "basepullTimestampField";
////////////////////////////////////////////////////////////////////////////////////////////////
// indicator that the timestamp field should be updated - e.g., the execute step is finished. //
////////////////////////////////////////////////////////////////////////////////////////////////
public static final String BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD = "basepullReadyToUpdateTimestamp";
/******************************************************************************* /*******************************************************************************
@ -184,14 +190,12 @@ public class RunProcessAction
} }
} }
/////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////
// if 'basepull' style process, store the time stored before process was ran // // if 'basepull' style process, update the stored basepull timestamp //
/////////////////////////////////////////////////////////////////////////////// // but only when we've been signaled to do so - i.e., after an Execute step runs. //
if(basepullConfiguration != null) ////////////////////////////////////////////////////////////////////////////////////
if(basepullConfiguration != null && BooleanUtils.isTrue(ValueUtils.getValueAsBoolean(runProcessInput.getValue(BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD))))
{ {
///////////////////////////////////////
// get the stored basepull timestamp //
///////////////////////////////////////
storeLastRunTime(runProcessInput, process, basepullConfiguration); storeLastRunTime(runProcessInput, process, basepullConfiguration);
} }
} }

View File

@ -97,6 +97,11 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
transformStep.postRun(runBackendStepInput, runBackendStepOutput); transformStep.postRun(runBackendStepInput, runBackendStepOutput);
loadStep.postRun(runBackendStepInput, runBackendStepOutput); loadStep.postRun(runBackendStepInput, runBackendStepOutput);
//////////////////////////////////////////////////////////////////////////////
// set the flag to state that the basepull timestamp should be updated now. //
//////////////////////////////////////////////////////////////////////////////
runBackendStepOutput.addValue(RunProcessAction.BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD, true);
///////////////////// /////////////////////
// commit the work // // commit the work //
///////////////////// /////////////////////

View File

@ -82,13 +82,21 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
return; return;
} }
/////////////////////////////////////////// //////////////////////////////////////////
// request a count from the extract step // // set up the extract & transform steps //
/////////////////////////////////////////// //////////////////////////////////////////
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput); AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
RecordPipe recordPipe = new RecordPipe();
extractStep.setLimit(limit);
extractStep.setRecordPipe(recordPipe);
extractStep.preRun(runBackendStepInput, runBackendStepOutput);
Integer recordCount = extractStep.doCount(runBackendStepInput); Integer recordCount = extractStep.doCount(runBackendStepInput);
runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount); runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// if the count is less than the normal limit here, and this process supports validation, then go straight to the validation step // // if the count is less than the normal limit here, and this process supports validation, then go straight to the validation step //
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -100,17 +108,6 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
// return; // return;
// } // }
////////////////////////////////////////////////////////
// proceed with a doing a limited extract & transform //
////////////////////////////////////////////////////////
RecordPipe recordPipe = new RecordPipe();
extractStep.setLimit(limit);
extractStep.setRecordPipe(recordPipe);
extractStep.preRun(runBackendStepInput, runBackendStepOutput);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
List<QRecord> previewRecordList = new ArrayList<>(); List<QRecord> previewRecordList = new ArrayList<>();
new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) -> new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) ->
{ {