From a86f42f373d660c6c5271286f06d20eb0af6ece2 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Mon, 22 Aug 2022 08:34:55 -0500 Subject: [PATCH] QQQ-37 initial buidout of StreamedETLWithFrontendProcess --- .../core/actions/async/AsyncJobCallback.java | 14 +- .../core/actions/async/AsyncJobManager.java | 11 ++ .../core/actions/async/AsyncJobStatus.java | 24 +++ .../actions/async/AsyncRecordPipeLoop.java | 157 ++++++++++++++++++ .../core/actions/customizers/QCodeLoader.java | 41 +++++ .../actions/interfaces/InsertInterface.java | 11 +- .../actions/interfaces/QActionInterface.java | 23 +++ .../core/actions/reporting/RecordPipe.java | 31 +++- .../actions/tables/query/QueryOutput.java | 4 +- .../actions/tables/update/UpdateInput.java | 38 ++++- .../metadata/processes/QProcessMetaData.java | 10 ++ .../AbstractExtractFunction.java | 58 +++++++ .../AbstractLoadFunction.java | 85 ++++++++++ .../AbstractTransformFunction.java | 61 +++++++ .../BaseStreamedETLStep.java | 49 ++++++ .../StreamedETLExecuteStep.java | 146 ++++++++++++++++ .../StreamedETLPreviewStep.java | 95 +++++++++++ .../StreamedETLWithFrontendProcess.java | 89 ++++++++++ .../rdbms/actions/AbstractRDBMSAction.java | 32 +++- .../rdbms/actions/RDBMSInsertAction.java | 23 --- .../rdbms/actions/RDBMSQueryAction.java | 6 + .../rdbms/actions/RDBMSUpdateAction.java | 39 ++++- 22 files changed, 998 insertions(+), 49 deletions(-) create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/QActionInterface.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractFunction.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadFunction.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformFunction.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallback.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallback.java index fd5993c3..ec9ff1de 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallback.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallback.java @@ -31,7 +31,6 @@ import com.kingsrook.qqq.backend.core.state.UUIDAndTypeStateKey; ** Argument passed to an AsyncJob when it runs, which can be used to communicate ** data back out of the job. ** - ** TODO - future - allow cancellation to be indicated here? *******************************************************************************/ public class AsyncJobCallback { @@ -107,4 +106,17 @@ public class AsyncJobCallback AsyncJobManager.getStateProvider().put(new UUIDAndTypeStateKey(jobUUID, StateType.ASYNC_JOB_STATUS), asyncJobStatus); } + + + /******************************************************************************* + ** Check if the asyncJobStatus had a cancellation requested. + ** + ** TODO - concern about multiple threads writing this object to a non-in-memory + ** state provider, and this value getting lost... + *******************************************************************************/ + public boolean wasCancelRequested() + { + return (this.asyncJobStatus.getCancelRequested()); + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobManager.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobManager.java index 63766320..fbf95f07 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobManager.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobManager.java @@ -183,4 +183,15 @@ public class AsyncJobManager // return TempFileStateProvider.getInstance(); } + + + /******************************************************************************* + ** + *******************************************************************************/ + public void cancelJob(String jobUUID) + { + Optional jobStatus = getJobStatus(jobUUID); + jobStatus.ifPresent(asyncJobStatus -> asyncJobStatus.setCancelRequested(true)); + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobStatus.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobStatus.java index 83810512..3ec54516 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobStatus.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobStatus.java @@ -37,6 +37,8 @@ public class AsyncJobStatus implements Serializable private Integer total; private Exception caughtException; + private boolean cancelRequested; + /******************************************************************************* @@ -163,4 +165,26 @@ public class AsyncJobStatus implements Serializable { this.caughtException = caughtException; } + + + + /******************************************************************************* + ** Getter for cancelRequested + ** + *******************************************************************************/ + public boolean getCancelRequested() + { + return cancelRequested; + } + + + + /******************************************************************************* + ** Setter for cancelRequested + ** + *******************************************************************************/ + public void setCancelRequested(boolean cancelRequested) + { + this.cancelRequested = cancelRequested; + } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java new file mode 100644 index 00000000..554ec0a1 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java @@ -0,0 +1,157 @@ +package com.kingsrook.qqq.backend.core.actions.async; + + +import java.io.Serializable; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.utils.SleepUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +/******************************************************************************* + ** + *******************************************************************************/ +public class AsyncRecordPipeLoop +{ + private static final Logger LOG = LogManager.getLogger(AsyncRecordPipeLoop.class); + + private static final int TIMEOUT_AFTER_NO_RECORDS_MS = 10 * 60 * 1000; + + private static final int MAX_SLEEP_MS = 1000; + private static final int INIT_SLEEP_MS = 10; + + + + /******************************************************************************* + ** + *******************************************************************************/ + public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, UnsafeFunction job, UnsafeSupplier consumer) throws QException + { + /////////////////////////////////////////////////// + // start the extraction function as an async job // + /////////////////////////////////////////////////// + AsyncJobManager asyncJobManager = new AsyncJobManager(); + String jobUUID = asyncJobManager.startJob(jobName, job::apply); + LOG.info("Started job [" + jobUUID + "] for record pipe streaming"); + + AsyncJobState jobState = AsyncJobState.RUNNING; + AsyncJobStatus asyncJobStatus = null; + + int recordCount = 0; + int nextSleepMillis = INIT_SLEEP_MS; + long lastReceivedRecordsAt = System.currentTimeMillis(); + long jobStartTime = System.currentTimeMillis(); + + while(jobState.equals(AsyncJobState.RUNNING)) + { + if(recordPipe.countAvailableRecords() == 0) + { + /////////////////////////////////////////////////////////// + // if the pipe is empty, sleep to let the producer work. // + // todo - smarter sleep? like get notified vs. sleep? // + /////////////////////////////////////////////////////////// + LOG.info("No records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work"); + SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS); + nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS); + + long timeSinceLastReceivedRecord = System.currentTimeMillis() - lastReceivedRecordsAt; + if(timeSinceLastReceivedRecord > TIMEOUT_AFTER_NO_RECORDS_MS) + { + throw (new QException("Job appears to have stopped producing records (last record received " + timeSinceLastReceivedRecord + " ms ago).")); + } + } + else + { + //////////////////////////////////////////////////////////////////////////////////////////////////////// + // if the pipe has records, consume them. reset the sleep timer so if we sleep again it'll be short. // + //////////////////////////////////////////////////////////////////////////////////////////////////////// + lastReceivedRecordsAt = System.currentTimeMillis(); + nextSleepMillis = INIT_SLEEP_MS; + + recordCount += consumer.get(); + LOG.info(String.format("Processed %,d records so far", recordCount)); + + if(recordLimit != null && recordCount >= recordLimit) + { + asyncJobManager.cancelJob(jobUUID); + + //////////////////////////////////////////////////////////////////////////////////////////////////// + // in case the extract function doesn't recognize the cancellation request, // + // tell the pipe to "terminate" - meaning - flush its queue and just noop when given new records. // + // this should prevent anyone writing to such a pipe from potentially filling & blocking. // + //////////////////////////////////////////////////////////////////////////////////////////////////// + recordPipe.terminate(); + + break; + } + } + + ////////////////////////////// + // refresh the job's status // + ////////////////////////////// + Optional optionalAsyncJobStatus = asyncJobManager.getJobStatus(jobUUID); + if(optionalAsyncJobStatus.isEmpty()) + { + ///////////////////////////////////////////////// + // todo - ... maybe some version of try-again? // + ///////////////////////////////////////////////// + throw (new QException("Could not get status of job [" + jobUUID + "]")); + } + asyncJobStatus = optionalAsyncJobStatus.get(); + jobState = asyncJobStatus.getState(); + } + + LOG.info("Job [" + jobUUID + "] completed with status: " + asyncJobStatus); + + /////////////////////////////////// + // propagate errors from the job // + /////////////////////////////////// + if(asyncJobStatus != null && asyncJobStatus.getState().equals(AsyncJobState.ERROR)) + { + throw (new QException("Job failed with an error", asyncJobStatus.getCaughtException())); + } + + ////////////////////////////////////////////////////// + // send the final records to transform & load steps // + ////////////////////////////////////////////////////// + recordCount += consumer.get(); + + long endTime = System.currentTimeMillis(); + LOG.info(String.format("Processed %,d records", recordCount) + + String.format(" at end of job in %,d ms (%.2f records/second).", (endTime - jobStartTime), 1000d * (recordCount / (.001d + (endTime - jobStartTime))))); + + return (recordCount); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @FunctionalInterface + public interface UnsafeFunction + { + /******************************************************************************* + ** + *******************************************************************************/ + R apply(T t) throws QException; + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @FunctionalInterface + public interface UnsafeSupplier + { + /******************************************************************************* + ** + *******************************************************************************/ + T get() throws QException; + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/customizers/QCodeLoader.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/customizers/QCodeLoader.java index 8e7c0dfc..bcc8a7a7 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/customizers/QCodeLoader.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/customizers/QCodeLoader.java @@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.actions.customizers; import java.util.Optional; import java.util.function.Function; +import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference; import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeType; import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData; @@ -93,4 +94,44 @@ public class QCodeLoader } } + + + /******************************************************************************* + ** + *******************************************************************************/ + public static T getBackendStep(Class expectedType, QCodeReference codeReference) + { + if(codeReference == null) + { + return (null); + } + + if(!codeReference.getCodeType().equals(QCodeType.JAVA)) + { + /////////////////////////////////////////////////////////////////////////////////////// + // todo - 1) support more languages, 2) wrap them w/ java Functions here, 3) profit! // + /////////////////////////////////////////////////////////////////////////////////////// + throw (new IllegalArgumentException("Only JAVA BackendSteps are supported at this time.")); + } + + try + { + Class customizerClass = Class.forName(codeReference.getName()); + return ((T) customizerClass.getConstructor().newInstance()); + } + catch(Exception e) + { + LOG.error("Error initializing customizer: " + codeReference); + + ////////////////////////////////////////////////////////////////////////////////////////////////////////// + // return null here - under the assumption that during normal run-time operations, we'll never hit here // + // as we'll want to validate all functions in the instance validator at startup time (and IT will throw // + // if it finds an invalid code reference // + ////////////////////////////////////////////////////////////////////////////////////////////////////////// + return (null); + } + } + + + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/InsertInterface.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/InsertInterface.java index dc90de66..3eb2738d 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/InsertInterface.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/InsertInterface.java @@ -22,7 +22,6 @@ package com.kingsrook.qqq.backend.core.actions.interfaces; -import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertOutput; @@ -32,19 +31,11 @@ import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertOutput; ** Interface for the Insert action. ** *******************************************************************************/ -public interface InsertInterface +public interface InsertInterface extends QActionInterface { /******************************************************************************* ** *******************************************************************************/ InsertOutput execute(InsertInput insertInput) throws QException; - /******************************************************************************* - ** - *******************************************************************************/ - default QBackendTransaction openTransaction(InsertInput insertInput) throws QException - { - return (new QBackendTransaction()); - } - } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/QActionInterface.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/QActionInterface.java new file mode 100644 index 00000000..cac3f87f --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/interfaces/QActionInterface.java @@ -0,0 +1,23 @@ +package com.kingsrook.qqq.backend.core.actions.interfaces; + + +import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput; + + +/******************************************************************************* + ** + *******************************************************************************/ +public interface QActionInterface +{ + + /******************************************************************************* + ** + *******************************************************************************/ + default QBackendTransaction openTransaction(AbstractTableActionInput input) throws QException + { + return (new QBackendTransaction()); + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java index 3703a841..e3eec254 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java @@ -42,16 +42,36 @@ public class RecordPipe private ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1_000); + private boolean isTerminated = false; + + + /******************************************************************************* + ** Turn off the pipe. Stop accepting new records (just ignore them in the add + ** method). Clear the existing queue. Don't return any more records. Note that + ** if consumeAvailableRecords was running in another thread, it may still return + ** some records that it read before this call. + *******************************************************************************/ + public void terminate() + { + isTerminated = true; + queue.clear(); + } + + /******************************************************************************* ** Add a record to the pipe - ** Returns true iff the record fit in the pipe; false if the pipe is currently full. *******************************************************************************/ public void addRecord(QRecord record) { + if(isTerminated) + { + return; + } + boolean offerResult = queue.offer(record); - while(!offerResult) + while(!offerResult && !isTerminated) { LOG.debug("Record pipe.add failed (due to full pipe). Blocking."); SleepUtils.sleep(100, TimeUnit.MILLISECONDS); @@ -78,7 +98,7 @@ public class RecordPipe { List rs = new ArrayList<>(); - while(true) + while(!isTerminated) { QRecord record = queue.poll(); if(record == null) @@ -98,6 +118,11 @@ public class RecordPipe *******************************************************************************/ public int countAvailableRecords() { + if(isTerminated) + { + return (0); + } + return (queue.size()); } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/query/QueryOutput.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/query/QueryOutput.java index 96340a69..9412e0d0 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/query/QueryOutput.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/query/QueryOutput.java @@ -25,8 +25,8 @@ package com.kingsrook.qqq.backend.core.model.actions.tables.query; import java.io.Serializable; import java.util.List; import java.util.function.Function; -import com.kingsrook.qqq.backend.core.actions.customizers.CustomizerLoader; import com.kingsrook.qqq.backend.core.actions.customizers.Customizers; +import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader; import com.kingsrook.qqq.backend.core.model.actions.AbstractActionOutput; import com.kingsrook.qqq.backend.core.model.data.QRecord; import org.apache.logging.log4j.LogManager; @@ -62,7 +62,7 @@ public class QueryOutput extends AbstractActionOutput implements Serializable storage = new QueryOutputList(); } - postQueryRecordCustomizer = (Function) CustomizerLoader.getTableCustomizerFunction(queryInput.getTable(), Customizers.POST_QUERY_RECORD); + postQueryRecordCustomizer = (Function) QCodeLoader.getTableCustomizerFunction(queryInput.getTable(), Customizers.POST_QUERY_RECORD); } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/update/UpdateInput.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/update/UpdateInput.java index fca9ee57..adabb9f4 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/update/UpdateInput.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/tables/update/UpdateInput.java @@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.model.actions.tables.update; import java.util.List; +import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput; import com.kingsrook.qqq.backend.core.model.data.QRecord; import com.kingsrook.qqq.backend.core.model.metadata.QInstance; @@ -34,7 +35,8 @@ import com.kingsrook.qqq.backend.core.model.metadata.QInstance; *******************************************************************************/ public class UpdateInput extends AbstractTableActionInput { - private List records; + private QBackendTransaction transaction; + private List records; //////////////////////////////////////////////////////////////////////////////////////////// // allow a caller to specify that they KNOW this optimization (e.g., in SQL) can be made. // @@ -65,6 +67,40 @@ public class UpdateInput extends AbstractTableActionInput + /******************************************************************************* + ** Getter for transaction + ** + *******************************************************************************/ + public QBackendTransaction getTransaction() + { + return transaction; + } + + + + /******************************************************************************* + ** Setter for transaction + ** + *******************************************************************************/ + public void setTransaction(QBackendTransaction transaction) + { + this.transaction = transaction; + } + + + + /******************************************************************************* + ** Fluent setter for transaction + ** + *******************************************************************************/ + public UpdateInput withTransaction(QBackendTransaction transaction) + { + this.transaction = transaction; + return (this); + } + + + /******************************************************************************* ** Getter for records ** diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/processes/QProcessMetaData.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/processes/QProcessMetaData.java index 07105a34..a9fa5611 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/processes/QProcessMetaData.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/processes/QProcessMetaData.java @@ -228,6 +228,16 @@ public class QProcessMetaData implements QAppChildMetaData + /******************************************************************************* + ** Wrapper to getStep, that internally casts to FrontendStepMetaData + *******************************************************************************/ + public QFrontendStepMetaData getFrontendStep(String name) + { + return (QFrontendStepMetaData) getStep(name); + } + + + /******************************************************************************* ** Get a list of all of the input fields used by all the steps in this process. *******************************************************************************/ diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractFunction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractFunction.java new file mode 100644 index 00000000..fbe41445 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractFunction.java @@ -0,0 +1,58 @@ +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; +import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe; + + +/******************************************************************************* + ** + *******************************************************************************/ +public abstract class AbstractExtractFunction implements BackendStep +{ + private RecordPipe recordPipe; + private Integer limit; + + + + /******************************************************************************* + ** + *******************************************************************************/ + public void setRecordPipe(RecordPipe recordPipe) + { + this.recordPipe = recordPipe; + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public RecordPipe getRecordPipe() + { + return recordPipe; + } + + + + /******************************************************************************* + ** Getter for limit + ** + *******************************************************************************/ + public Integer getLimit() + { + return limit; + } + + + + /******************************************************************************* + ** Setter for limit + ** + *******************************************************************************/ + public void setLimit(Integer limit) + { + this.limit = limit; + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadFunction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadFunction.java new file mode 100644 index 00000000..500ed3f6 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadFunction.java @@ -0,0 +1,85 @@ +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import java.util.ArrayList; +import java.util.List; +import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; +import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.data.QRecord; + + +/******************************************************************************* + ** + *******************************************************************************/ +public abstract class AbstractLoadFunction implements BackendStep +{ + private List inputRecordPage = new ArrayList<>(); + private List outputRecordPage = new ArrayList<>(); + + protected QBackendTransaction transaction; + + + + /******************************************************************************* + ** + *******************************************************************************/ + public QBackendTransaction openTransaction(RunBackendStepInput runBackendStepInput) throws QException + { + this.transaction = doOpenTransaction(runBackendStepInput); + return (transaction); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected abstract QBackendTransaction doOpenTransaction(RunBackendStepInput runBackendStepInput) throws QException; + + + + /******************************************************************************* + ** Getter for recordPage + ** + *******************************************************************************/ + public List getInputRecordPage() + { + return inputRecordPage; + } + + + + /******************************************************************************* + ** Setter for recordPage + ** + *******************************************************************************/ + public void setInputRecordPage(List inputRecordPage) + { + this.inputRecordPage = inputRecordPage; + } + + + + /******************************************************************************* + ** Getter for outputRecordPage + ** + *******************************************************************************/ + public List getOutputRecordPage() + { + return outputRecordPage; + } + + + + /******************************************************************************* + ** Setter for outputRecordPage + ** + *******************************************************************************/ + public void setOutputRecordPage(List outputRecordPage) + { + this.outputRecordPage = outputRecordPage; + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformFunction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformFunction.java new file mode 100644 index 00000000..c21f4c89 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformFunction.java @@ -0,0 +1,61 @@ +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import java.util.ArrayList; +import java.util.List; +import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; +import com.kingsrook.qqq.backend.core.model.data.QRecord; + + +/******************************************************************************* + ** + *******************************************************************************/ +public abstract class AbstractTransformFunction implements BackendStep +{ + private List inputRecordPage = new ArrayList<>(); + private List outputRecordPage = new ArrayList<>(); + + + + /******************************************************************************* + ** Getter for recordPage + ** + *******************************************************************************/ + public List getInputRecordPage() + { + return inputRecordPage; + } + + + + /******************************************************************************* + ** Setter for recordPage + ** + *******************************************************************************/ + public void setInputRecordPage(List inputRecordPage) + { + this.inputRecordPage = inputRecordPage; + } + + + + /******************************************************************************* + ** Getter for outputRecordPage + ** + *******************************************************************************/ + public List getOutputRecordPage() + { + return outputRecordPage; + } + + + + /******************************************************************************* + ** Setter for outputRecordPage + ** + *******************************************************************************/ + public void setOutputRecordPage(List outputRecordPage) + { + this.outputRecordPage = outputRecordPage; + } +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java new file mode 100644 index 00000000..cd239a5f --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java @@ -0,0 +1,49 @@ +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference; + + +/******************************************************************************* + ** Base class for the StreamedETL preview & execute steps + *******************************************************************************/ +public class BaseStreamedETLStep +{ + protected static final int IN_MEMORY_RECORD_LIMIT = 20; + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected AbstractExtractFunction getExtractFunction(RunBackendStepInput runBackendStepInput) + { + QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_EXTRACT_CODE); + return (QCodeLoader.getBackendStep(AbstractExtractFunction.class, codeReference)); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected AbstractTransformFunction getTransformFunction(RunBackendStepInput runBackendStepInput) + { + QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE); + return (QCodeLoader.getBackendStep(AbstractTransformFunction.class, codeReference)); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected AbstractLoadFunction getLoadFunction(RunBackendStepInput runBackendStepInput) + { + QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE); + return (QCodeLoader.getBackendStep(AbstractLoadFunction.class, codeReference)); + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java new file mode 100644 index 00000000..f484ace5 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java @@ -0,0 +1,146 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2022. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import java.util.ArrayList; +import java.util.List; +import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; +import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop; +import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; +import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; +import com.kingsrook.qqq.backend.core.model.data.QRecord; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.StreamedETLProcess; + + +/******************************************************************************* + ** Backend step to do a execute a streamed ETL job + *******************************************************************************/ +public class StreamedETLExecuteStep extends BaseStreamedETLStep implements BackendStep +{ + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + @SuppressWarnings("checkstyle:indentation") + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + QBackendTransaction transaction = null; + + try + { + /////////////////////////////////////////////////////// + // set up the extract, transform, and load functions // + /////////////////////////////////////////////////////// + RecordPipe recordPipe = new RecordPipe(); + AbstractExtractFunction extractFunction = getExtractFunction(runBackendStepInput); + extractFunction.setRecordPipe(recordPipe); + + AbstractTransformFunction transformFunction = getTransformFunction(runBackendStepInput); + AbstractLoadFunction loadFunction = getLoadFunction(runBackendStepInput); + + transaction = loadFunction.openTransaction(runBackendStepInput); + + List loadedRecordList = new ArrayList<>(); + int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractFunction", null, recordPipe, (status) -> + { + extractFunction.run(runBackendStepInput, runBackendStepOutput); + return (runBackendStepOutput); + }, + () -> (consumeRecordsFromPipe(recordPipe, transformFunction, loadFunction, runBackendStepInput, runBackendStepOutput, loadedRecordList)) + ); + + runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount); + runBackendStepOutput.setRecords(loadedRecordList); + + ///////////////////// + // commit the work // + ///////////////////// + transaction.commit(); + } + catch(Exception e) + { + //////////////////////////////////////////////////////////////////////////////// + // rollback the work, then re-throw the error for up-stream to catch & report // + //////////////////////////////////////////////////////////////////////////////// + if(transaction != null) + { + transaction.rollback(); + } + throw (e); + } + finally + { + //////////////////////////////////////////////////////////// + // always close our transactions (e.g., jdbc connections) // + //////////////////////////////////////////////////////////// + if(transaction != null) + { + transaction.close(); + } + } + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformFunction transformFunction, AbstractLoadFunction loadFunction, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List loadedRecordList) throws QException + { + /////////////////////////////////// + // get the records from the pipe // + /////////////////////////////////// + List qRecords = recordPipe.consumeAvailableRecords(); + + ///////////////////////////////////////////////////// + // pass the records through the transform function // + ///////////////////////////////////////////////////// + transformFunction.setInputRecordPage(qRecords); + transformFunction.setOutputRecordPage(new ArrayList<>()); + transformFunction.run(runBackendStepInput, runBackendStepOutput); + + //////////////////////////////////////////////// + // pass the records through the load function // + //////////////////////////////////////////////// + loadFunction.setInputRecordPage(transformFunction.getOutputRecordPage()); + loadFunction.setOutputRecordPage(new ArrayList<>()); + loadFunction.run(runBackendStepInput, runBackendStepOutput); + + /////////////////////////////////////////////////////// + // copy a small number of records to the output list // + /////////////////////////////////////////////////////// + int i = 0; + while(loadedRecordList.size() < IN_MEMORY_RECORD_LIMIT && i < loadFunction.getOutputRecordPage().size()) + { + loadedRecordList.add(loadFunction.getOutputRecordPage().get(i++)); + } + + return (qRecords.size()); + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java new file mode 100644 index 00000000..f9cc4143 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java @@ -0,0 +1,95 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2022. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import java.util.ArrayList; +import java.util.List; +import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop; +import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; +import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; +import com.kingsrook.qqq.backend.core.model.data.QRecord; + + +/******************************************************************************* + ** Backend step to do a preview of a full streamed ETL job + *******************************************************************************/ +public class StreamedETLPreviewStep extends BaseStreamedETLStep implements BackendStep +{ + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + @SuppressWarnings("checkstyle:indentation") + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + RecordPipe recordPipe = new RecordPipe(); + AbstractExtractFunction extractFunction = getExtractFunction(runBackendStepInput); + extractFunction.setLimit(IN_MEMORY_RECORD_LIMIT); // todo - process field? + extractFunction.setRecordPipe(recordPipe); + + AbstractTransformFunction transformFunction = getTransformFunction(runBackendStepInput); + + List transformedRecordList = new ArrayList<>(); + new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractFunction", IN_MEMORY_RECORD_LIMIT, recordPipe, (status) -> + { + extractFunction.run(runBackendStepInput, runBackendStepOutput); + return (runBackendStepOutput); + }, + () -> (consumeRecordsFromPipe(recordPipe, transformFunction, runBackendStepInput, runBackendStepOutput, transformedRecordList)) + ); + + runBackendStepOutput.setRecords(transformedRecordList); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformFunction transformFunction, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List transformedRecordList) throws QException + { + /////////////////////////////////// + // get the records from the pipe // + /////////////////////////////////// + List qRecords = recordPipe.consumeAvailableRecords(); + + ///////////////////////////////////////////////////// + // pass the records through the transform function // + ///////////////////////////////////////////////////// + transformFunction.setInputRecordPage(qRecords); + transformFunction.run(runBackendStepInput, runBackendStepOutput); + + //////////////////////////////////////////////////// + // add the transformed records to the output list // + //////////////////////////////////////////////////// + transformedRecordList.addAll(transformFunction.getOutputRecordPage()); + + return (qRecords.size()); + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java new file mode 100644 index 00000000..4b4ba456 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java @@ -0,0 +1,89 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2022. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference; +import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.processes.QFrontendStepMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.processes.QFunctionInputMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData; + + +/******************************************************************************* + ** Definition for Streamed ETL process that includes a frontend. + ** + *******************************************************************************/ +public class StreamedETLWithFrontendProcess +{ + public static final String PROCESS_NAME = "etl.streamedWithFrontend"; + + public static final String STEP_NAME_PREVIEW = "preview"; + public static final String STEP_NAME_REVIEW = "review"; + public static final String STEP_NAME_EXECUTE = "execute"; + public static final String STEP_NAME_RESULT = "result"; + + public static final String FIELD_EXTRACT_CODE = "extract"; + public static final String FIELD_TRANSFORM_CODE = "transform"; + public static final String FIELD_LOAD_CODE = "load"; + + public static final String FIELD_SOURCE_TABLE = "sourceTable"; + public static final String FIELD_DESTINATION_TABLE = "destinationTable"; + public static final String FIELD_MAPPING_JSON = "mappingJSON"; + public static final String FIELD_RECORD_COUNT = "recordCount"; + + + + /******************************************************************************* + ** + *******************************************************************************/ + public QProcessMetaData defineProcessMetaData() + { + QStepMetaData previewStep = new QBackendStepMetaData() + .withName(STEP_NAME_PREVIEW) + .withCode(new QCodeReference(StreamedETLPreviewStep.class)) + .withInputData(new QFunctionInputMetaData() + .withField(new QFieldMetaData().withName(FIELD_EXTRACT_CODE)) + .withField(new QFieldMetaData().withName(FIELD_TRANSFORM_CODE))); + + QFrontendStepMetaData reviewStep = new QFrontendStepMetaData() + .withName(STEP_NAME_REVIEW); + + QStepMetaData executeStep = new QBackendStepMetaData() + .withName(STEP_NAME_EXECUTE) + .withCode(new QCodeReference(StreamedETLExecuteStep.class)) + .withInputData(new QFunctionInputMetaData() + .withField(new QFieldMetaData().withName(FIELD_LOAD_CODE))); + + QFrontendStepMetaData resultStep = new QFrontendStepMetaData() + .withName(STEP_NAME_RESULT); + + return new QProcessMetaData() + .withName(PROCESS_NAME) + .addStep(previewStep) + .addStep(reviewStep) + .addStep(executeStep) + .addStep(resultStep); + } +} diff --git a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/AbstractRDBMSAction.java b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/AbstractRDBMSAction.java index feafe6c9..3ec0545f 100644 --- a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/AbstractRDBMSAction.java +++ b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/AbstractRDBMSAction.java @@ -29,6 +29,9 @@ import java.util.ArrayList; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; +import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; +import com.kingsrook.qqq.backend.core.actions.interfaces.QActionInterface; +import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput; import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria; import com.kingsrook.qqq.backend.core.model.data.QRecord; @@ -40,13 +43,18 @@ import com.kingsrook.qqq.backend.core.utils.ValueUtils; import com.kingsrook.qqq.backend.module.rdbms.jdbc.ConnectionManager; import com.kingsrook.qqq.backend.module.rdbms.model.metadata.RDBMSBackendMetaData; import com.kingsrook.qqq.backend.module.rdbms.model.metadata.RDBMSTableBackendDetails; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; /******************************************************************************* ** Base class for all core actions in the RDBMS module. *******************************************************************************/ -public abstract class AbstractRDBMSAction +public abstract class AbstractRDBMSAction implements QActionInterface { + private static final Logger LOG = LogManager.getLogger(AbstractRDBMSAction.class); + + /******************************************************************************* ** Get the table name to use in the RDBMS from a QTableMetaData. @@ -319,4 +327,26 @@ public abstract class AbstractRDBMSAction { return fieldType == QFieldType.STRING || fieldType == QFieldType.TEXT || fieldType == QFieldType.HTML || fieldType == QFieldType.PASSWORD; } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public QBackendTransaction openTransaction(AbstractTableActionInput input) throws QException + { + try + { + LOG.info("Opening transaction"); + Connection connection = getConnection(input); + + return (new RDBMSTransaction(connection)); + } + catch(Exception e) + { + throw new QException("Error opening transaction: " + e.getMessage(), e); + } + } + } diff --git a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSInsertAction.java b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSInsertAction.java index 7bdd715b..f4a08148 100644 --- a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSInsertAction.java +++ b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSInsertAction.java @@ -28,7 +28,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; -import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.actions.interfaces.InsertInterface; import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; @@ -160,26 +159,4 @@ public class RDBMSInsertAction extends AbstractRDBMSAction implements InsertInte } } - - - /******************************************************************************* - ** - *******************************************************************************/ - @Override - public QBackendTransaction openTransaction(InsertInput insertInput) throws QException - { - try - { - LOG.info("Opening transaction"); - Connection connection = getConnection(insertInput); - - return (new RDBMSTransaction(connection)); - } - catch(Exception e) - { - throw new QException("Error opening transaction: " + e.getMessage(), e); - } - } - - } diff --git a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSQueryAction.java b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSQueryAction.java index 566a220e..fa08c192 100644 --- a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSQueryAction.java +++ b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSQueryAction.java @@ -123,6 +123,12 @@ public class RDBMSQueryAction extends AbstractRDBMSAction implements QueryInterf } queryOutput.addRecord(record); + + if(queryInput.getAsyncJobCallback().wasCancelRequested()) + { + LOG.info("Breaking query job, as requested."); + break; + } } }), params); diff --git a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSUpdateAction.java b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSUpdateAction.java index d7ef8739..559db22c 100644 --- a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSUpdateAction.java +++ b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSUpdateAction.java @@ -114,14 +114,37 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte outputRecords.add(outputRecord); } - try(Connection connection = getConnection(updateInput)) + try { - ///////////////////////////////////////////////////////////////////////////////////////////// - // process each distinct list of fields being updated (e.g., each different SQL statement) // - ///////////////////////////////////////////////////////////////////////////////////////////// - for(List fieldsBeingUpdated : recordsByFieldBeingUpdated.keySet()) + Connection connection; + boolean needToCloseConnection = false; + if(updateInput.getTransaction() != null && updateInput.getTransaction() instanceof RDBMSTransaction rdbmsTransaction) { - updateRecordsWithMatchingListOfFields(updateInput, connection, table, recordsByFieldBeingUpdated.get(fieldsBeingUpdated), fieldsBeingUpdated); + LOG.debug("Using connection from insertInput [" + rdbmsTransaction.getConnection() + "]"); + connection = rdbmsTransaction.getConnection(); + } + else + { + connection = getConnection(updateInput); + needToCloseConnection = true; + } + + try + { + ///////////////////////////////////////////////////////////////////////////////////////////// + // process each distinct list of fields being updated (e.g., each different SQL statement) // + ///////////////////////////////////////////////////////////////////////////////////////////// + for(List fieldsBeingUpdated : recordsByFieldBeingUpdated.keySet()) + { + updateRecordsWithMatchingListOfFields(updateInput, connection, table, recordsByFieldBeingUpdated.get(fieldsBeingUpdated), fieldsBeingUpdated); + } + } + finally + { + if(needToCloseConnection) + { + connection.close(); + } } return rs; @@ -191,7 +214,6 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte - /******************************************************************************* ** *******************************************************************************/ @@ -276,6 +298,8 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte return (true); } + + /******************************************************************************* ** *******************************************************************************/ @@ -285,5 +309,4 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte updateInput.getAsyncJobCallback().updateStatus(statusCounter, updateInput.getRecords().size()); } - }