mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
Add auditInputs to process step outputs, and execution of them in streamed ETL Execute
This commit is contained in:
@ -28,6 +28,7 @@ import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.AbstractActionOutput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.audits.AuditInput;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
|
||||
|
||||
@ -41,6 +42,7 @@ public class RunBackendStepOutput extends AbstractActionOutput implements Serial
|
||||
private ProcessState processState;
|
||||
private Exception exception; // todo - make optional
|
||||
|
||||
private List<AuditInput> auditInputList;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -256,4 +258,35 @@ public class RunBackendStepOutput extends AbstractActionOutput implements Serial
|
||||
this.processState.getRecords().add(record);
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Getter for auditInputList
|
||||
*******************************************************************************/
|
||||
public List<AuditInput> getAuditInputList()
|
||||
{
|
||||
return (this.auditInputList);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Setter for auditInputList
|
||||
*******************************************************************************/
|
||||
public void setAuditInputList(List<AuditInput> auditInputList)
|
||||
{
|
||||
this.auditInputList = auditInputList;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Fluent setter for auditInputList
|
||||
*******************************************************************************/
|
||||
public RunBackendStepOutput withAuditInputList(List<AuditInput> auditInputList)
|
||||
{
|
||||
this.auditInputList = auditInputList;
|
||||
return (this);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -27,14 +27,17 @@ import java.util.List;
|
||||
import java.util.Optional;
|
||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||
import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop;
|
||||
import com.kingsrook.qqq.backend.core.actions.audits.AuditAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.audits.AuditInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -201,6 +204,7 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
|
||||
// pass the records through the transform function //
|
||||
/////////////////////////////////////////////////////
|
||||
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
|
||||
List<AuditInput> auditInputListFromTransform = streamedBackendStepOutput.getAuditInputList();
|
||||
|
||||
////////////////////////////////////////////////
|
||||
// pass the records through the load function //
|
||||
@ -209,6 +213,7 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
|
||||
streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
|
||||
|
||||
loadStep.run(streamedBackendStepInput, streamedBackendStepOutput);
|
||||
List<AuditInput> auditInputListFromLoad = streamedBackendStepOutput.getAuditInputList();
|
||||
|
||||
///////////////////////////////////////////////////////
|
||||
// copy a small number of records to the output list //
|
||||
@ -219,6 +224,20 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
|
||||
loadedRecordList.add(streamedBackendStepOutput.getRecords().get(i++));
|
||||
}
|
||||
|
||||
//////////////////////////////////////////////////////
|
||||
// if we have a batch of audit inputs, execute them //
|
||||
//////////////////////////////////////////////////////
|
||||
List<AuditInput> mergedAuditInputList = CollectionUtils.mergeLists(auditInputListFromTransform, auditInputListFromLoad);
|
||||
if(CollectionUtils.nullSafeHasContents(mergedAuditInputList))
|
||||
{
|
||||
AuditAction auditAction = new AuditAction();
|
||||
for(AuditInput auditInput : mergedAuditInputList)
|
||||
{
|
||||
auditAction.execute(auditInput);
|
||||
}
|
||||
}
|
||||
runBackendStepOutput.setAuditInputList(null);
|
||||
|
||||
currentRowCount += qRecords.size();
|
||||
return (qRecords.size());
|
||||
}
|
||||
|
Reference in New Issue
Block a user