CE-938 - MORE propagation of updated steps from inside streamed-ETL pseudo-steps to top-level process state

This commit is contained in:
2024-06-04 10:58:28 -05:00
parent 90ac1bb9c3
commit 1a66f45425
3 changed files with 50 additions and 1 deletions

View File

@ -174,6 +174,14 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
transformStep.postRun(postRunInput, postRunOutput);
loadStep.postRun(postRunInput, postRunOutput);
//////////////////////////////////////////////////////////////////////
// propagate data from inner-step state to process-level step state //
//////////////////////////////////////////////////////////////////////
if(postRunOutput.getUpdatedFrontendStepList() != null)
{
runBackendStepOutput.setUpdatedFrontendStepList(postRunOutput.getUpdatedFrontendStepList());
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// explicitly copy values back into the runStepOutput from the post-run output //
// this might not be needed, since they (presumably) share a processState object, but just in case that changes... //
@ -271,6 +279,15 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
List<AuditInput> auditInputListFromTransform = streamedBackendStepOutput.getAuditInputList();
//////////////////////////////////////////////////////////////////////
// propagate data from inner-step state to process-level step state //
//////////////////////////////////////////////////////////////////////
if(streamedBackendStepOutput.getUpdatedFrontendStepList() != null)
{
runBackendStepOutput.getProcessState().setStepList(streamedBackendStepOutput.getProcessState().getStepList());
runBackendStepOutput.setUpdatedFrontendStepList(streamedBackendStepOutput.getUpdatedFrontendStepList());
}
////////////////////////////////////////////////
// pass the records through the load function //
////////////////////////////////////////////////
@ -280,6 +297,15 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
loadStep.run(streamedBackendStepInput, streamedBackendStepOutput);
List<AuditInput> auditInputListFromLoad = streamedBackendStepOutput.getAuditInputList();
//////////////////////////////////////////////////////////////////////
// propagate data from inner-step state to process-level step state //
//////////////////////////////////////////////////////////////////////
if(streamedBackendStepOutput.getUpdatedFrontendStepList() != null)
{
runBackendStepOutput.getProcessState().setStepList(streamedBackendStepOutput.getProcessState().getStepList());
runBackendStepOutput.setUpdatedFrontendStepList(streamedBackendStepOutput.getUpdatedFrontendStepList());
}
///////////////////////////////////////////////////////
// copy a small number of records to the output list //
///////////////////////////////////////////////////////

View File

@ -145,7 +145,9 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
BackendStepPostRunInput postRunInput = new BackendStepPostRunInput(runBackendStepInput);
transformStep.postRun(postRunInput, postRunOutput);
// todo figure out what kind of test we can get on this
//////////////////////////////////////////////////////////////////////
// propagate data from inner-step state to process-level step state //
//////////////////////////////////////////////////////////////////////
if(postRunOutput.getUpdatedFrontendStepList() != null)
{
runBackendStepOutput.setUpdatedFrontendStepList(postRunOutput.getUpdatedFrontendStepList());
@ -214,6 +216,15 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
/////////////////////////////////////////////////////
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
//////////////////////////////////////////////////////////////////////
// propagate data from inner-step state to process-level step state //
//////////////////////////////////////////////////////////////////////
if(streamedBackendStepOutput.getUpdatedFrontendStepList() != null)
{
runBackendStepOutput.getProcessState().setStepList(streamedBackendStepOutput.getProcessState().getStepList());
runBackendStepOutput.setUpdatedFrontendStepList(streamedBackendStepOutput.getUpdatedFrontendStepList());
}
////////////////////////////////////////////////////
// add the transformed records to the output list //
////////////////////////////////////////////////////

View File

@ -142,6 +142,9 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
BackendStepPostRunInput postRunInput = new BackendStepPostRunInput(runBackendStepInput);
transformStep.postRun(postRunInput, postRunOutput);
//////////////////////////////////////////////////////////////////////
// propagate data from inner-step state to process-level step state //
//////////////////////////////////////////////////////////////////////
if(postRunOutput.getUpdatedFrontendStepList() != null)
{
runBackendStepOutput.setUpdatedFrontendStepList(postRunOutput.getUpdatedFrontendStepList());
@ -177,6 +180,15 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
/////////////////////////////////////////////////////
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
//////////////////////////////////////////////////////////////////////
// propagate data from inner-step state to process-level step state //
//////////////////////////////////////////////////////////////////////
if(streamedBackendStepOutput.getUpdatedFrontendStepList() != null)
{
runBackendStepOutput.getProcessState().setStepList(streamedBackendStepOutput.getProcessState().getStepList());
runBackendStepOutput.setUpdatedFrontendStepList(streamedBackendStepOutput.getUpdatedFrontendStepList());
}
///////////////////////////////////////////////////////
// copy a small number of records to the output list //
///////////////////////////////////////////////////////