Let StreamedETLWithFrontendProcesses have different transaction levels

This commit is contained in:
2023-02-27 10:27:11 -06:00
parent ea731bac5c
commit 4f3c03de1a
2 changed files with 142 additions and 57 deletions

View File

@ -101,9 +101,16 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
transformStep.preRun(runBackendStepInput, runBackendStepOutput); transformStep.preRun(runBackendStepInput, runBackendStepOutput);
loadStep.preRun(runBackendStepInput, runBackendStepOutput); loadStep.preRun(runBackendStepInput, runBackendStepOutput);
transaction = loadStep.openTransaction(runBackendStepInput); /////////////////////////////////////////////////////////////////////////////
loadStep.setTransaction(transaction); // open a transaction for the whole process, if that's the requested level //
transformStep.setTransaction(transaction); /////////////////////////////////////////////////////////////////////////////
boolean doProcessLevelTransaction = StreamedETLWithFrontendProcess.TRANSACTION_LEVEL_PROCESS.equals(runBackendStepInput.getValueString(StreamedETLWithFrontendProcess.FIELD_TRANSACTION_LEVEL));
if(doProcessLevelTransaction)
{
transaction = loadStep.openTransaction(runBackendStepInput);
loadStep.setTransaction(transaction);
transformStep.setTransaction(transaction);
}
List<QRecord> loadedRecordList = new ArrayList<>(); List<QRecord> loadedRecordList = new ArrayList<>();
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) -> int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) ->
@ -145,10 +152,10 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
runBackendStepOutput.addValue(RunProcessAction.BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD, true); runBackendStepOutput.addValue(RunProcessAction.BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD, true);
///////////////////// ////////////////////////////////////////////////////////
// commit the work // // commit the work at the process level if applicable //
///////////////////// ////////////////////////////////////////////////////////
if(transaction.isPresent()) if(doProcessLevelTransaction && transaction.isPresent())
{ {
transaction.get().commit(); transaction.get().commit();
} }
@ -183,63 +190,98 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
*******************************************************************************/ *******************************************************************************/
private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, AbstractLoadStep loadStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> loadedRecordList) throws QException private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, AbstractLoadStep loadStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> loadedRecordList) throws QException
{ {
Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT); /////////////////////////////////////////////////////////////////////////////
if(totalRows != null) // open a transaction for the whole process, if that's the requested level //
/////////////////////////////////////////////////////////////////////////////
Optional<QBackendTransaction> transaction = Optional.empty();
boolean doPageLevelTransaction = StreamedETLWithFrontendProcess.TRANSACTION_LEVEL_PAGE.equals(runBackendStepInput.getValueString(StreamedETLWithFrontendProcess.FIELD_TRANSACTION_LEVEL));
if(doPageLevelTransaction)
{ {
runBackendStepInput.getAsyncJobCallback().updateStatus(currentRowCount, totalRows); transaction = loadStep.openTransaction(runBackendStepInput);
loadStep.setTransaction(transaction);
transformStep.setTransaction(transaction);
} }
/////////////////////////////////// try
// get the records from the pipe //
///////////////////////////////////
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
///////////////////////////////////////////////////////////////////////
// make streamed input & output objects from the run input & outputs //
///////////////////////////////////////////////////////////////////////
StreamedBackendStepInput streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, qRecords);
StreamedBackendStepOutput streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
/////////////////////////////////////////////////////
// pass the records through the transform function //
/////////////////////////////////////////////////////
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
List<AuditInput> auditInputListFromTransform = streamedBackendStepOutput.getAuditInputList();
////////////////////////////////////////////////
// pass the records through the load function //
////////////////////////////////////////////////
streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, streamedBackendStepOutput.getRecords());
streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
loadStep.run(streamedBackendStepInput, streamedBackendStepOutput);
List<AuditInput> auditInputListFromLoad = streamedBackendStepOutput.getAuditInputList();
///////////////////////////////////////////////////////
// copy a small number of records to the output list //
///////////////////////////////////////////////////////
int i = 0;
while(loadedRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < streamedBackendStepOutput.getRecords().size())
{ {
loadedRecordList.add(streamedBackendStepOutput.getRecords().get(i++)); Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT);
} if(totalRows != null)
//////////////////////////////////////////////////////
// if we have a batch of audit inputs, execute them //
//////////////////////////////////////////////////////
List<AuditInput> mergedAuditInputList = CollectionUtils.mergeLists(auditInputListFromTransform, auditInputListFromLoad);
if(CollectionUtils.nullSafeHasContents(mergedAuditInputList))
{
AuditAction auditAction = new AuditAction();
for(AuditInput auditInput : mergedAuditInputList)
{ {
auditAction.execute(auditInput); runBackendStepInput.getAsyncJobCallback().updateStatus(currentRowCount, totalRows);
}
///////////////////////////////////
// get the records from the pipe //
///////////////////////////////////
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
///////////////////////////////////////////////////////////////////////
// make streamed input & output objects from the run input & outputs //
///////////////////////////////////////////////////////////////////////
StreamedBackendStepInput streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, qRecords);
StreamedBackendStepOutput streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
/////////////////////////////////////////////////////
// pass the records through the transform function //
/////////////////////////////////////////////////////
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
List<AuditInput> auditInputListFromTransform = streamedBackendStepOutput.getAuditInputList();
////////////////////////////////////////////////
// pass the records through the load function //
////////////////////////////////////////////////
streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, streamedBackendStepOutput.getRecords());
streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
loadStep.run(streamedBackendStepInput, streamedBackendStepOutput);
List<AuditInput> auditInputListFromLoad = streamedBackendStepOutput.getAuditInputList();
///////////////////////////////////////////////////////
// copy a small number of records to the output list //
///////////////////////////////////////////////////////
int i = 0;
while(loadedRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < streamedBackendStepOutput.getRecords().size())
{
loadedRecordList.add(streamedBackendStepOutput.getRecords().get(i++));
}
//////////////////////////////////////////////////////
// if we have a batch of audit inputs, execute them //
//////////////////////////////////////////////////////
List<AuditInput> mergedAuditInputList = CollectionUtils.mergeLists(auditInputListFromTransform, auditInputListFromLoad);
if(CollectionUtils.nullSafeHasContents(mergedAuditInputList))
{
AuditAction auditAction = new AuditAction();
for(AuditInput auditInput : mergedAuditInputList)
{
auditAction.execute(auditInput);
}
}
runBackendStepOutput.setAuditInputList(null);
if(doPageLevelTransaction && transaction.isPresent())
{
transaction.get().commit();
}
currentRowCount += qRecords.size();
return (qRecords.size());
}
catch(Exception e)
{
if(doPageLevelTransaction && transaction.isPresent())
{
transaction.get().rollback();
}
throw (e);
}
finally
{
if(doPageLevelTransaction && transaction.isPresent())
{
transaction.get().close();
} }
} }
runBackendStepOutput.setAuditInputList(null);
currentRowCount += qRecords.size();
return (qRecords.size());
} }
} }

View File

@ -93,6 +93,11 @@ public class StreamedETLWithFrontendProcess
public static final String DEFAULT_PREVIEW_MESSAGE_FOR_DELETE = "This is a preview of the records that will be deleted."; public static final String DEFAULT_PREVIEW_MESSAGE_FOR_DELETE = "This is a preview of the records that will be deleted.";
public static final String FIELD_PREVIEW_MESSAGE = "previewMessage"; public static final String FIELD_PREVIEW_MESSAGE = "previewMessage";
public static final String FIELD_TRANSACTION_LEVEL = "transactionLevel";
public static final String TRANSACTION_LEVEL_AUTO_COMMIT = "autoCommit";
public static final String TRANSACTION_LEVEL_PAGE = "page";
public static final String TRANSACTION_LEVEL_PROCESS = "process";
/******************************************************************************* /*******************************************************************************
@ -109,6 +114,7 @@ public class StreamedETLWithFrontendProcess
Map<String, Serializable> defaultFieldValues = new HashMap<>(); Map<String, Serializable> defaultFieldValues = new HashMap<>();
defaultFieldValues.put(FIELD_SOURCE_TABLE, sourceTableName); defaultFieldValues.put(FIELD_SOURCE_TABLE, sourceTableName);
defaultFieldValues.put(FIELD_DESTINATION_TABLE, destinationTableName); defaultFieldValues.put(FIELD_DESTINATION_TABLE, destinationTableName);
defaultFieldValues.put(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS);
return defineProcessMetaData(extractStepClass, transformStepClass, loadStepClass, defaultFieldValues); return defineProcessMetaData(extractStepClass, transformStepClass, loadStepClass, defaultFieldValues);
} }
@ -141,6 +147,7 @@ public class StreamedETLWithFrontendProcess
.withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass))) .withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass)))
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass))) .withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass)))
.withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT))) .withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT)))
.withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS)))
); );
QFrontendStepMetaData reviewStep = new QFrontendStepMetaData() QFrontendStepMetaData reviewStep = new QFrontendStepMetaData()
@ -278,6 +285,42 @@ public class StreamedETLWithFrontendProcess
/*******************************************************************************
** Fluent setter to set transaction level to auto-commit
**
*******************************************************************************/
public Builder withTransactionLevelAutoCommit()
{
setInputFieldDefaultValue(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_AUTO_COMMIT);
return (this);
}
/*******************************************************************************
** Fluent setter to set transaction level to page
**
*******************************************************************************/
public Builder withTransactionLevelPage()
{
setInputFieldDefaultValue(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PAGE);
return (this);
}
/*******************************************************************************
** Fluent setter to set transaction level to process
**
*******************************************************************************/
public Builder withTransactionLevelProcess()
{
setInputFieldDefaultValue(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS);
return (this);
}
/******************************************************************************* /*******************************************************************************
** Fluent setter for doFullValidation ** Fluent setter for doFullValidation
** **