diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java index 03e5bbd6..01bbe647 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java @@ -101,9 +101,16 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe transformStep.preRun(runBackendStepInput, runBackendStepOutput); loadStep.preRun(runBackendStepInput, runBackendStepOutput); - transaction = loadStep.openTransaction(runBackendStepInput); - loadStep.setTransaction(transaction); - transformStep.setTransaction(transaction); + ///////////////////////////////////////////////////////////////////////////// + // open a transaction for the whole process, if that's the requested level // + ///////////////////////////////////////////////////////////////////////////// + boolean doProcessLevelTransaction = StreamedETLWithFrontendProcess.TRANSACTION_LEVEL_PROCESS.equals(runBackendStepInput.getValueString(StreamedETLWithFrontendProcess.FIELD_TRANSACTION_LEVEL)); + if(doProcessLevelTransaction) + { + transaction = loadStep.openTransaction(runBackendStepInput); + loadStep.setTransaction(transaction); + transformStep.setTransaction(transaction); + } List loadedRecordList = new ArrayList<>(); int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) -> @@ -145,10 +152,10 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe ////////////////////////////////////////////////////////////////////////////// runBackendStepOutput.addValue(RunProcessAction.BASEPULL_READY_TO_UPDATE_TIMESTAMP_FIELD, true); - ///////////////////// - // commit the work // - ///////////////////// - if(transaction.isPresent()) + //////////////////////////////////////////////////////// + // commit the work at the process level if applicable // + //////////////////////////////////////////////////////// + if(doProcessLevelTransaction && transaction.isPresent()) { transaction.get().commit(); } @@ -183,63 +190,98 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe *******************************************************************************/ private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, AbstractLoadStep loadStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List loadedRecordList) throws QException { - Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT); - if(totalRows != null) + ///////////////////////////////////////////////////////////////////////////// + // open a transaction for the whole process, if that's the requested level // + ///////////////////////////////////////////////////////////////////////////// + Optional transaction = Optional.empty(); + boolean doPageLevelTransaction = StreamedETLWithFrontendProcess.TRANSACTION_LEVEL_PAGE.equals(runBackendStepInput.getValueString(StreamedETLWithFrontendProcess.FIELD_TRANSACTION_LEVEL)); + if(doPageLevelTransaction) { - runBackendStepInput.getAsyncJobCallback().updateStatus(currentRowCount, totalRows); + transaction = loadStep.openTransaction(runBackendStepInput); + loadStep.setTransaction(transaction); + transformStep.setTransaction(transaction); } - /////////////////////////////////// - // get the records from the pipe // - /////////////////////////////////// - List qRecords = recordPipe.consumeAvailableRecords(); - - /////////////////////////////////////////////////////////////////////// - // make streamed input & output objects from the run input & outputs // - /////////////////////////////////////////////////////////////////////// - StreamedBackendStepInput streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, qRecords); - StreamedBackendStepOutput streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput); - - ///////////////////////////////////////////////////// - // pass the records through the transform function // - ///////////////////////////////////////////////////// - transformStep.run(streamedBackendStepInput, streamedBackendStepOutput); - List auditInputListFromTransform = streamedBackendStepOutput.getAuditInputList(); - - //////////////////////////////////////////////// - // pass the records through the load function // - //////////////////////////////////////////////// - streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, streamedBackendStepOutput.getRecords()); - streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput); - - loadStep.run(streamedBackendStepInput, streamedBackendStepOutput); - List auditInputListFromLoad = streamedBackendStepOutput.getAuditInputList(); - - /////////////////////////////////////////////////////// - // copy a small number of records to the output list // - /////////////////////////////////////////////////////// - int i = 0; - while(loadedRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < streamedBackendStepOutput.getRecords().size()) + try { - loadedRecordList.add(streamedBackendStepOutput.getRecords().get(i++)); - } - - ////////////////////////////////////////////////////// - // if we have a batch of audit inputs, execute them // - ////////////////////////////////////////////////////// - List mergedAuditInputList = CollectionUtils.mergeLists(auditInputListFromTransform, auditInputListFromLoad); - if(CollectionUtils.nullSafeHasContents(mergedAuditInputList)) - { - AuditAction auditAction = new AuditAction(); - for(AuditInput auditInput : mergedAuditInputList) + Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT); + if(totalRows != null) { - auditAction.execute(auditInput); + runBackendStepInput.getAsyncJobCallback().updateStatus(currentRowCount, totalRows); + } + + /////////////////////////////////// + // get the records from the pipe // + /////////////////////////////////// + List qRecords = recordPipe.consumeAvailableRecords(); + + /////////////////////////////////////////////////////////////////////// + // make streamed input & output objects from the run input & outputs // + /////////////////////////////////////////////////////////////////////// + StreamedBackendStepInput streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, qRecords); + StreamedBackendStepOutput streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput); + + ///////////////////////////////////////////////////// + // pass the records through the transform function // + ///////////////////////////////////////////////////// + transformStep.run(streamedBackendStepInput, streamedBackendStepOutput); + List auditInputListFromTransform = streamedBackendStepOutput.getAuditInputList(); + + //////////////////////////////////////////////// + // pass the records through the load function // + //////////////////////////////////////////////// + streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, streamedBackendStepOutput.getRecords()); + streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput); + + loadStep.run(streamedBackendStepInput, streamedBackendStepOutput); + List auditInputListFromLoad = streamedBackendStepOutput.getAuditInputList(); + + /////////////////////////////////////////////////////// + // copy a small number of records to the output list // + /////////////////////////////////////////////////////// + int i = 0; + while(loadedRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < streamedBackendStepOutput.getRecords().size()) + { + loadedRecordList.add(streamedBackendStepOutput.getRecords().get(i++)); + } + + ////////////////////////////////////////////////////// + // if we have a batch of audit inputs, execute them // + ////////////////////////////////////////////////////// + List mergedAuditInputList = CollectionUtils.mergeLists(auditInputListFromTransform, auditInputListFromLoad); + if(CollectionUtils.nullSafeHasContents(mergedAuditInputList)) + { + AuditAction auditAction = new AuditAction(); + for(AuditInput auditInput : mergedAuditInputList) + { + auditAction.execute(auditInput); + } + } + runBackendStepOutput.setAuditInputList(null); + + if(doPageLevelTransaction && transaction.isPresent()) + { + transaction.get().commit(); + } + + currentRowCount += qRecords.size(); + return (qRecords.size()); + } + catch(Exception e) + { + if(doPageLevelTransaction && transaction.isPresent()) + { + transaction.get().rollback(); + } + throw (e); + } + finally + { + if(doPageLevelTransaction && transaction.isPresent()) + { + transaction.get().close(); } } - runBackendStepOutput.setAuditInputList(null); - - currentRowCount += qRecords.size(); - return (qRecords.size()); } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java index e2f57883..10d49799 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java @@ -93,6 +93,11 @@ public class StreamedETLWithFrontendProcess public static final String DEFAULT_PREVIEW_MESSAGE_FOR_DELETE = "This is a preview of the records that will be deleted."; public static final String FIELD_PREVIEW_MESSAGE = "previewMessage"; + public static final String FIELD_TRANSACTION_LEVEL = "transactionLevel"; + public static final String TRANSACTION_LEVEL_AUTO_COMMIT = "autoCommit"; + public static final String TRANSACTION_LEVEL_PAGE = "page"; + public static final String TRANSACTION_LEVEL_PROCESS = "process"; + /******************************************************************************* @@ -109,6 +114,7 @@ public class StreamedETLWithFrontendProcess Map defaultFieldValues = new HashMap<>(); defaultFieldValues.put(FIELD_SOURCE_TABLE, sourceTableName); defaultFieldValues.put(FIELD_DESTINATION_TABLE, destinationTableName); + defaultFieldValues.put(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS); return defineProcessMetaData(extractStepClass, transformStepClass, loadStepClass, defaultFieldValues); } @@ -141,6 +147,7 @@ public class StreamedETLWithFrontendProcess .withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass))) .withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass))) .withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT))) + .withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS))) ); QFrontendStepMetaData reviewStep = new QFrontendStepMetaData() @@ -278,6 +285,42 @@ public class StreamedETLWithFrontendProcess + /******************************************************************************* + ** Fluent setter to set transaction level to auto-commit + ** + *******************************************************************************/ + public Builder withTransactionLevelAutoCommit() + { + setInputFieldDefaultValue(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_AUTO_COMMIT); + return (this); + } + + + + /******************************************************************************* + ** Fluent setter to set transaction level to page + ** + *******************************************************************************/ + public Builder withTransactionLevelPage() + { + setInputFieldDefaultValue(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PAGE); + return (this); + } + + + + /******************************************************************************* + ** Fluent setter to set transaction level to process + ** + *******************************************************************************/ + public Builder withTransactionLevelProcess() + { + setInputFieldDefaultValue(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS); + return (this); + } + + + /******************************************************************************* ** Fluent setter for doFullValidation **