diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java index 554ec0a1..a15948f3 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java @@ -12,7 +12,10 @@ import org.apache.logging.log4j.Logger; /******************************************************************************* + ** Class that knows how to Run an asynchronous job (lambda, supplier) that writes into a + ** RecordPipe, with another lambda (consumer) that consumes records from the pipe. ** + ** Takes care of the job status monitoring, blocking when the pipe is empty, etc. *******************************************************************************/ public class AsyncRecordPipeLoop { @@ -20,22 +23,32 @@ public class AsyncRecordPipeLoop private static final int TIMEOUT_AFTER_NO_RECORDS_MS = 10 * 60 * 1000; - private static final int MAX_SLEEP_MS = 1000; - private static final int INIT_SLEEP_MS = 10; + private static final int MAX_SLEEP_MS = 1000; + private static final int INIT_SLEEP_MS = 10; + private static final int MIN_RECORDS_TO_CONSUME = 10; /******************************************************************************* + ** Run an async-record-pipe-loop. ** + ** @param jobName name for the async job thread + ** @param recordLimit optionally, cancel the supplier/job after this number of records. + * e.g., for a preview step. + ** @param recordPipe constructed before this call, and used in both of the lambdas + ** @param supplier lambda that adds records into the pipe. + * e.g., a query or extract step. + ** @param consumer lambda that consumes records from the pipe + * e.g., a transform/load step. *******************************************************************************/ - public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, UnsafeFunction job, UnsafeSupplier consumer) throws QException + public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, UnsafeFunction supplier, UnsafeSupplier consumer) throws QException { /////////////////////////////////////////////////// // start the extraction function as an async job // /////////////////////////////////////////////////// AsyncJobManager asyncJobManager = new AsyncJobManager(); - String jobUUID = asyncJobManager.startJob(jobName, job::apply); - LOG.info("Started job [" + jobUUID + "] for record pipe streaming"); + String jobUUID = asyncJobManager.startJob(jobName, supplier::apply); + LOG.info("Started supplier job [" + jobUUID + "] for record pipe."); AsyncJobState jobState = AsyncJobState.RUNNING; AsyncJobStatus asyncJobStatus = null; @@ -47,13 +60,13 @@ public class AsyncRecordPipeLoop while(jobState.equals(AsyncJobState.RUNNING)) { - if(recordPipe.countAvailableRecords() == 0) + if(recordPipe.countAvailableRecords() < MIN_RECORDS_TO_CONSUME) { - /////////////////////////////////////////////////////////// - // if the pipe is empty, sleep to let the producer work. // - // todo - smarter sleep? like get notified vs. sleep? // - /////////////////////////////////////////////////////////// - LOG.info("No records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work"); + /////////////////////////////////////////////////////////////// + // if the pipe is too empty, sleep to let the producer work. // + // todo - smarter sleep? like get notified vs. sleep? // + /////////////////////////////////////////////////////////////// + LOG.debug("Too few records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work"); SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS); nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS); @@ -114,9 +127,9 @@ public class AsyncRecordPipeLoop throw (new QException("Job failed with an error", asyncJobStatus.getCaughtException())); } - ////////////////////////////////////////////////////// - // send the final records to transform & load steps // - ////////////////////////////////////////////////////// + //////////////////////////////////////////// + // send the final records to the consumer // + //////////////////////////////////////////// recordCount += consumer.get(); long endTime = System.currentTimeMillis(); diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/QProcessCallback.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/QProcessCallback.java index ce64dfad..95fe9415 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/QProcessCallback.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/QProcessCallback.java @@ -39,10 +39,16 @@ public interface QProcessCallback /******************************************************************************* ** Get the filter query for this callback. *******************************************************************************/ - QQueryFilter getQueryFilter(); + default QQueryFilter getQueryFilter() + { + return (null); + } /******************************************************************************* ** Get the field values for this callback. *******************************************************************************/ - Map getFieldValues(List fields); + default Map getFieldValues(List fields) + { + return (null); + } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamed/StreamedETLBackendStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamed/StreamedETLBackendStep.java index ce2cfd7b..04e3c199 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamed/StreamedETLBackendStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamed/StreamedETLBackendStep.java @@ -23,12 +23,8 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed; import java.util.List; -import java.util.Optional; -import java.util.concurrent.TimeUnit; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; -import com.kingsrook.qqq.backend.core.actions.async.AsyncJobManager; -import com.kingsrook.qqq.backend.core.actions.async.AsyncJobState; -import com.kingsrook.qqq.backend.core.actions.async.AsyncJobStatus; +import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop; import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe; import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; @@ -41,7 +37,6 @@ import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicE import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLLoadFunction; import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLProcess; import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLTransformFunction; -import com.kingsrook.qqq.backend.core.utils.SleepUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -64,6 +59,7 @@ public class StreamedETLBackendStep implements BackendStep ** *******************************************************************************/ @Override + @SuppressWarnings("checkstyle:indentation") public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException { QBackendTransaction transaction = openTransaction(runBackendStepInput); @@ -74,96 +70,23 @@ public class StreamedETLBackendStep implements BackendStep BasicETLExtractFunction basicETLExtractFunction = new BasicETLExtractFunction(); basicETLExtractFunction.setRecordPipe(recordPipe); - ////////////////////////////////////////// - // run the query action as an async job // - ////////////////////////////////////////// - AsyncJobManager asyncJobManager = new AsyncJobManager(); - String queryJobUUID = asyncJobManager.startJob("StreamedETL>QueryAction", (status) -> - { - basicETLExtractFunction.run(runBackendStepInput, runBackendStepOutput); - return (runBackendStepOutput); - }); - LOG.info("Started query job [" + queryJobUUID + "] for streamed ETL"); - - AsyncJobState queryJobState = AsyncJobState.RUNNING; - AsyncJobStatus asyncJobStatus = null; - - long recordCount = 0; - int nextSleepMillis = INIT_SLEEP_MS; - long lastReceivedRecordsAt = System.currentTimeMillis(); - long jobStartTime = System.currentTimeMillis(); - - while(queryJobState.equals(AsyncJobState.RUNNING)) - { - if(recordPipe.countAvailableRecords() == 0) + //////////////////////////////////// + // run the async-record-pipe loop // + //////////////////////////////////// + int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Extract", null, recordPipe, (status) -> { - /////////////////////////////////////////////////////////// - // if the pipe is empty, sleep to let the producer work. // - // todo - smarter sleep? like get notified vs. sleep? // - /////////////////////////////////////////////////////////// - LOG.info("No records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work"); - SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS); - nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS); + basicETLExtractFunction.run(runBackendStepInput, runBackendStepOutput); + return (runBackendStepOutput); + }, + () -> (consumeRecordsFromPipe(recordPipe, runBackendStepInput, runBackendStepOutput, transaction)) + ); - long timeSinceLastReceivedRecord = System.currentTimeMillis() - lastReceivedRecordsAt; - if(timeSinceLastReceivedRecord > TIMEOUT_AFTER_NO_RECORDS_MS) - { - throw (new QException("Query action appears to have stopped producing records (last record received " + timeSinceLastReceivedRecord + " ms ago).")); - } - } - else - { - //////////////////////////////////////////////////////////////////////////////////////////////////////// - // if the pipe has records, consume them. reset the sleep timer so if we sleep again it'll be short. // - //////////////////////////////////////////////////////////////////////////////////////////////////////// - lastReceivedRecordsAt = System.currentTimeMillis(); - nextSleepMillis = INIT_SLEEP_MS; - - recordCount += consumeRecordsFromPipe(recordPipe, runBackendStepInput, runBackendStepOutput, transaction); - - LOG.info(String.format("Processed %,d records so far", recordCount)); - } - - //////////////////////////////////// - // refresh the query job's status // - //////////////////////////////////// - Optional optionalAsyncJobStatus = asyncJobManager.getJobStatus(queryJobUUID); - if(optionalAsyncJobStatus.isEmpty()) - { - ///////////////////////////////////////////////// - // todo - ... maybe some version of try-again? // - ///////////////////////////////////////////////// - throw (new QException("Could not get status of report query job [" + queryJobUUID + "]")); - } - asyncJobStatus = optionalAsyncJobStatus.get(); - queryJobState = asyncJobStatus.getState(); - } - - LOG.info("Query job [" + queryJobUUID + "] for ETL completed with status: " + asyncJobStatus); - - ///////////////////////////////////////// - // propagate errors from the query job // - ///////////////////////////////////////// - if(asyncJobStatus.getState().equals(AsyncJobState.ERROR)) - { - throw (new QException("Query job failed with an error", asyncJobStatus.getCaughtException())); - } - - ////////////////////////////////////////////////////// - // send the final records to transform & load steps // - ////////////////////////////////////////////////////// - recordCount += consumeRecordsFromPipe(recordPipe, runBackendStepInput, runBackendStepOutput, transaction); + runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount); ///////////////////// // commit the work // ///////////////////// transaction.commit(); - - long reportEndTime = System.currentTimeMillis(); - LOG.info(String.format("Processed %,d records", recordCount) - + String.format(" at end of ETL job in %,d ms (%.2f records/second).", (reportEndTime - jobStartTime), 1000d * (recordCount / (.001d + (reportEndTime - jobStartTime))))); - - runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount); } catch(Exception e) { diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractFunction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractStep.java similarity index 58% rename from qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractFunction.java rename to qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractStep.java index fbe41445..5d4b5c2a 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractFunction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractExtractStep.java @@ -3,18 +3,41 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; /******************************************************************************* + ** Base class for the Extract logic of Streamed ETL processes. ** + ** These steps are invoked by both the "preview" and the "execute" steps of a + ** StreamedETLWithFrontend process. + ** + ** Key here, is that subclasses here should put records that they're "Extracting" + ** into the recordPipe member. That is to say, DO NOT use the recordList in + ** the Step input/output objects. + ** + ** Ideally, they'll also stop once they've hit the "limit" number of records + ** (though if you keep going, the pipe will get terminated and the job will be + ** cancelled, etc...). *******************************************************************************/ -public abstract class AbstractExtractFunction implements BackendStep +public abstract class AbstractExtractStep implements BackendStep { private RecordPipe recordPipe; private Integer limit; + /******************************************************************************* + ** + *******************************************************************************/ + public Integer doCount(RunBackendStepInput runBackendStepInput) throws QException + { + return (null); + } + + + /******************************************************************************* ** *******************************************************************************/ diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadFunction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java similarity index 67% rename from qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadFunction.java rename to qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java index 500ed3f6..6f70f7e4 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadFunction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java @@ -3,6 +3,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit import java.util.ArrayList; import java.util.List; +import java.util.Optional; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.exceptions.QException; @@ -11,35 +12,33 @@ import com.kingsrook.qqq.backend.core.model.data.QRecord; /******************************************************************************* + ** Base class for the Load (aka, store) logic of Streamed ETL processes. ** + ** Records are to be read out of the inputRecordPage field, and after storing, + ** should be written to the outputRecordPage. That is to say, DO NOT use the + ** recordList in the step input/output objects. + ** + ** Also - use the transaction member variable - though be aware, it *******************************************************************************/ -public abstract class AbstractLoadFunction implements BackendStep +public abstract class AbstractLoadStep implements BackendStep { - private List inputRecordPage = new ArrayList<>(); + private List inputRecordPage = new ArrayList<>(); private List outputRecordPage = new ArrayList<>(); - protected QBackendTransaction transaction; + private Optional transaction = Optional.empty(); /******************************************************************************* ** *******************************************************************************/ - public QBackendTransaction openTransaction(RunBackendStepInput runBackendStepInput) throws QException + public Optional openTransaction(RunBackendStepInput runBackendStepInput) throws QException { - this.transaction = doOpenTransaction(runBackendStepInput); - return (transaction); + return (Optional.empty()); } - /******************************************************************************* - ** - *******************************************************************************/ - protected abstract QBackendTransaction doOpenTransaction(RunBackendStepInput runBackendStepInput) throws QException; - - - /******************************************************************************* ** Getter for recordPage ** @@ -82,4 +81,25 @@ public abstract class AbstractLoadFunction implements BackendStep this.outputRecordPage = outputRecordPage; } + + + /******************************************************************************* + ** Setter for transaction + ** + *******************************************************************************/ + public void setTransaction(Optional transaction) + { + this.transaction = transaction; + } + + + + /******************************************************************************* + ** Getter for transaction + ** + *******************************************************************************/ + public Optional getTransaction() + { + return (transaction); + } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformFunction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java similarity index 84% rename from qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformFunction.java rename to qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java index c21f4c89..8f97390c 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformFunction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java @@ -8,9 +8,13 @@ import com.kingsrook.qqq.backend.core.model.data.QRecord; /******************************************************************************* + ** Base class for the Transform logic of Streamed ETL processes. ** + ** Records are to be read out of the inputRecordPage field, and after transformation, + ** should be written to the outputRecordPage. That is to say, DO NOT use the + ** recordList in the step input/output objects. *******************************************************************************/ -public abstract class AbstractTransformFunction implements BackendStep +public abstract class AbstractTransformStep implements BackendStep { private List inputRecordPage = new ArrayList<>(); private List outputRecordPage = new ArrayList<>(); diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java index cd239a5f..75c71501 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/BaseStreamedETLStep.java @@ -11,17 +11,17 @@ import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference; *******************************************************************************/ public class BaseStreamedETLStep { - protected static final int IN_MEMORY_RECORD_LIMIT = 20; + protected static final int PROCESS_OUTPUT_RECORD_LIST_LIMIT = 20; /******************************************************************************* ** *******************************************************************************/ - protected AbstractExtractFunction getExtractFunction(RunBackendStepInput runBackendStepInput) + protected AbstractExtractStep getExtractStep(RunBackendStepInput runBackendStepInput) { QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_EXTRACT_CODE); - return (QCodeLoader.getBackendStep(AbstractExtractFunction.class, codeReference)); + return (QCodeLoader.getBackendStep(AbstractExtractStep.class, codeReference)); } @@ -29,10 +29,10 @@ public class BaseStreamedETLStep /******************************************************************************* ** *******************************************************************************/ - protected AbstractTransformFunction getTransformFunction(RunBackendStepInput runBackendStepInput) + protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput) { QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE); - return (QCodeLoader.getBackendStep(AbstractTransformFunction.class, codeReference)); + return (QCodeLoader.getBackendStep(AbstractTransformStep.class, codeReference)); } @@ -40,10 +40,10 @@ public class BaseStreamedETLStep /******************************************************************************* ** *******************************************************************************/ - protected AbstractLoadFunction getLoadFunction(RunBackendStepInput runBackendStepInput) + protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput) { QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE); - return (QCodeLoader.getBackendStep(AbstractLoadFunction.class, codeReference)); + return (QCodeLoader.getBackendStep(AbstractLoadStep.class, codeReference)); } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/ExtractViaQueryStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/ExtractViaQueryStep.java new file mode 100644 index 00000000..f843668d --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/ExtractViaQueryStep.java @@ -0,0 +1,125 @@ +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import java.io.IOException; +import java.io.Serializable; +import com.kingsrook.qqq.backend.core.actions.tables.CountAction; +import com.kingsrook.qqq.backend.core.actions.tables.QueryAction; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; +import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountOutput; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput; +import com.kingsrook.qqq.backend.core.utils.JsonUtils; + + +/******************************************************************************* + ** Generic implementation of an ExtractStep - that runs a Query action for a + ** specified table. + ** + ** If a query is specified from the caller (e.g., using the process Callback + ** mechanism), that will be used. Else a filter (object or json) in + ** StreamedETLWithFrontendProcess.FIELD_DEFAULT_QUERY_FILTER will be checked. + *******************************************************************************/ +public class ExtractViaQueryStep extends AbstractExtractStep +{ + public static final String FIELD_SOURCE_TABLE = "sourceTable"; + + + + /******************************************************************************* + ** Execute the backend step - using the request as input, and the result as output. + ** + *******************************************************************************/ + @Override + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance()); + queryInput.setSession(runBackendStepInput.getSession()); + queryInput.setTableName(runBackendStepInput.getValueString(FIELD_SOURCE_TABLE)); + queryInput.setFilter(getQueryFilter(runBackendStepInput)); + queryInput.setRecordPipe(getRecordPipe()); + queryInput.setLimit(getLimit()); + new QueryAction().execute(queryInput); + + /////////////////////////////////////////////////////////////////// + // output is done into the pipe - so, nothing for us to do here. // + /////////////////////////////////////////////////////////////////// + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public Integer doCount(RunBackendStepInput runBackendStepInput) throws QException + { + CountInput countInput = new CountInput(runBackendStepInput.getInstance()); + countInput.setSession(runBackendStepInput.getSession()); + countInput.setTableName(runBackendStepInput.getValueString(FIELD_SOURCE_TABLE)); + countInput.setFilter(getQueryFilter(runBackendStepInput)); + CountOutput countOutput = new CountAction().execute(countInput); + return (countOutput.getCount()); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected QQueryFilter getQueryFilter(RunBackendStepInput runBackendStepInput) throws QException + { + ////////////////////////////////////////////////////////////////////////////////////// + // if the queryFilterJson field is populated, read the filter from it and return it // + ////////////////////////////////////////////////////////////////////////////////////// + String queryFilterJson = runBackendStepInput.getValueString("queryFilterJson"); + if(queryFilterJson != null) + { + try + { + return (JsonUtils.toObject(queryFilterJson, QQueryFilter.class)); + } + catch(IOException e) + { + throw new QException("Error loading query filter from json field", e); + } + } + else if(runBackendStepInput.getCallback() != null && runBackendStepInput.getCallback().getQueryFilter() != null) + { + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // else, try to get filter from process callback. if we got one, store it as a process value for later steps // + //////////////////////////////////////////////////////////////////////////////////////////////////////////////// + QQueryFilter queryFilter = runBackendStepInput.getCallback().getQueryFilter(); + runBackendStepInput.addValue("queryFilterJson", JsonUtils.toJson(queryFilter)); + return (queryFilter); + } + else + { + ///////////////////////////////////////////////////// + // else, see if a defaultQueryFilter was specified // + ///////////////////////////////////////////////////// + Serializable defaultQueryFilter = runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_DEFAULT_QUERY_FILTER); + if(defaultQueryFilter instanceof QQueryFilter filter) + { + return (filter); + } + if(defaultQueryFilter instanceof String string) + { + try + { + return (JsonUtils.toObject(string, QQueryFilter.class)); + } + catch(IOException e) + { + throw new QException("Error loading default query filter from json", e); + } + } + } + + throw (new QException("Could not find query filter for Extract step.")); + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaInsertStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaInsertStep.java new file mode 100644 index 00000000..10c0567f --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaInsertStep.java @@ -0,0 +1,54 @@ +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import java.util.Optional; +import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; +import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; +import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertOutput; + + +/******************************************************************************* + ** Generic implementation of a LoadStep - that runs an Insert action for a + ** specified table. + *******************************************************************************/ +public class LoadViaInsertStep extends AbstractLoadStep +{ + public static final String FIELD_DESTINATION_TABLE = "destinationTable"; + + + + /******************************************************************************* + ** Execute the backend step - using the request as input, and the result as output. + ** + *******************************************************************************/ + @Override + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance()); + insertInput.setSession(runBackendStepInput.getSession()); + insertInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE)); + insertInput.setRecords(getInputRecordPage()); + getTransaction().ifPresent(insertInput::setTransaction); + InsertOutput insertOutput = new InsertAction().execute(insertInput); + getOutputRecordPage().addAll(insertOutput.getRecords()); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public Optional openTransaction(RunBackendStepInput runBackendStepInput) throws QException + { + InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance()); + insertInput.setSession(runBackendStepInput.getSession()); + insertInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE)); + + return (Optional.of(new InsertAction().openTransaction(insertInput))); + } +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaUpdateStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaUpdateStep.java new file mode 100644 index 00000000..6c34b623 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaUpdateStep.java @@ -0,0 +1,56 @@ +package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; + + +import java.util.Optional; +import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; +import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; +import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; +import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateOutput; + + +/******************************************************************************* + ** Generic implementation of a LoadStep - that runs an Update action for a + ** specified table. + *******************************************************************************/ +public class LoadViaUpdateStep extends AbstractLoadStep +{ + public static final String FIELD_DESTINATION_TABLE = "destinationTable"; + + + + /******************************************************************************* + ** Execute the backend step - using the request as input, and the result as output. + ** + *******************************************************************************/ + @Override + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + UpdateInput updateInput = new UpdateInput(runBackendStepInput.getInstance()); + updateInput.setSession(runBackendStepInput.getSession()); + updateInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE)); + updateInput.setRecords(getInputRecordPage()); + getTransaction().ifPresent(updateInput::setTransaction); + UpdateOutput updateOutput = new UpdateAction().execute(updateInput); + getOutputRecordPage().addAll(updateOutput.getRecords()); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public Optional openTransaction(RunBackendStepInput runBackendStepInput) throws QException + { + InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance()); + insertInput.setSession(runBackendStepInput.getSession()); + insertInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE)); + + return (Optional.of(new InsertAction().openTransaction(insertInput))); + } +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java index 2a00a5cc..f8d2f679 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java @@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit import java.util.ArrayList; import java.util.List; +import java.util.Optional; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop; import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; @@ -36,10 +37,14 @@ import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.Str /******************************************************************************* - ** Backend step to do a execute a streamed ETL job + ** Backend step to do the execute portion of a streamed ETL job. + ** + ** Works within a transaction (per the backend module of the destination table). *******************************************************************************/ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements BackendStep { + private int currentRowCount = 1; + /******************************************************************************* @@ -49,29 +54,30 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe @SuppressWarnings("checkstyle:indentation") public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException { - QBackendTransaction transaction = null; + Optional transaction = Optional.empty(); try { /////////////////////////////////////////////////////// // set up the extract, transform, and load functions // /////////////////////////////////////////////////////// - RecordPipe recordPipe = new RecordPipe(); - AbstractExtractFunction extractFunction = getExtractFunction(runBackendStepInput); - extractFunction.setRecordPipe(recordPipe); + RecordPipe recordPipe = new RecordPipe(); + AbstractExtractStep extractStep = getExtractStep(runBackendStepInput); + extractStep.setRecordPipe(recordPipe); - AbstractTransformFunction transformFunction = getTransformFunction(runBackendStepInput); - AbstractLoadFunction loadFunction = getLoadFunction(runBackendStepInput); + AbstractTransformStep transformStep = getTransformStep(runBackendStepInput); + AbstractLoadStep loadStep = getLoadStep(runBackendStepInput); - transaction = loadFunction.openTransaction(runBackendStepInput); + transaction = loadStep.openTransaction(runBackendStepInput); + loadStep.setTransaction(transaction); List loadedRecordList = new ArrayList<>(); - int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractFunction", null, recordPipe, (status) -> + int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) -> { - extractFunction.run(runBackendStepInput, runBackendStepOutput); + extractStep.run(runBackendStepInput, runBackendStepOutput); return (runBackendStepOutput); }, - () -> (consumeRecordsFromPipe(recordPipe, transformFunction, loadFunction, runBackendStepInput, runBackendStepOutput, loadedRecordList)) + () -> (consumeRecordsFromPipe(recordPipe, transformStep, loadStep, runBackendStepInput, runBackendStepOutput, loadedRecordList)) ); runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount); @@ -80,9 +86,9 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe ///////////////////// // commit the work // ///////////////////// - if(transaction != null) + if(transaction.isPresent()) { - transaction.commit(); + transaction.get().commit(); } } catch(Exception e) @@ -90,9 +96,9 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe //////////////////////////////////////////////////////////////////////////////// // rollback the work, then re-throw the error for up-stream to catch & report // //////////////////////////////////////////////////////////////////////////////// - if(transaction != null) + if(transaction.isPresent()) { - transaction.rollback(); + transaction.get().rollback(); } throw (e); } @@ -101,9 +107,9 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe //////////////////////////////////////////////////////////// // always close our transactions (e.g., jdbc connections) // //////////////////////////////////////////////////////////// - if(transaction != null) + if(transaction.isPresent()) { - transaction.close(); + transaction.get().close(); } } } @@ -113,8 +119,14 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe /******************************************************************************* ** *******************************************************************************/ - private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformFunction transformFunction, AbstractLoadFunction loadFunction, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List loadedRecordList) throws QException + private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, AbstractLoadStep loadStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List loadedRecordList) throws QException { + Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLProcess.FIELD_RECORD_COUNT); + if(totalRows != null) + { + runBackendStepInput.getAsyncJobCallback().updateStatus(currentRowCount, totalRows); + } + /////////////////////////////////// // get the records from the pipe // /////////////////////////////////// @@ -123,26 +135,27 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe ///////////////////////////////////////////////////// // pass the records through the transform function // ///////////////////////////////////////////////////// - transformFunction.setInputRecordPage(qRecords); - transformFunction.setOutputRecordPage(new ArrayList<>()); - transformFunction.run(runBackendStepInput, runBackendStepOutput); + transformStep.setInputRecordPage(qRecords); + transformStep.setOutputRecordPage(new ArrayList<>()); + transformStep.run(runBackendStepInput, runBackendStepOutput); //////////////////////////////////////////////// // pass the records through the load function // //////////////////////////////////////////////// - loadFunction.setInputRecordPage(transformFunction.getOutputRecordPage()); - loadFunction.setOutputRecordPage(new ArrayList<>()); - loadFunction.run(runBackendStepInput, runBackendStepOutput); + loadStep.setInputRecordPage(transformStep.getOutputRecordPage()); + loadStep.setOutputRecordPage(new ArrayList<>()); + loadStep.run(runBackendStepInput, runBackendStepOutput); /////////////////////////////////////////////////////// // copy a small number of records to the output list // /////////////////////////////////////////////////////// int i = 0; - while(loadedRecordList.size() < IN_MEMORY_RECORD_LIMIT && i < loadFunction.getOutputRecordPage().size()) + while(loadedRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < loadStep.getOutputRecordPage().size()) { - loadedRecordList.add(loadFunction.getOutputRecordPage().get(i++)); + loadedRecordList.add(loadStep.getOutputRecordPage().get(i++)); } + currentRowCount += qRecords.size(); return (qRecords.size()); } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java index f9cc4143..ee920b3c 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLPreviewStep.java @@ -31,6 +31,7 @@ import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; import com.kingsrook.qqq.backend.core.model.data.QRecord; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.StreamedETLProcess; /******************************************************************************* @@ -47,20 +48,26 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe @SuppressWarnings("checkstyle:indentation") public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException { - RecordPipe recordPipe = new RecordPipe(); - AbstractExtractFunction extractFunction = getExtractFunction(runBackendStepInput); - extractFunction.setLimit(IN_MEMORY_RECORD_LIMIT); // todo - process field? - extractFunction.setRecordPipe(recordPipe); + RecordPipe recordPipe = new RecordPipe(); + AbstractExtractStep extractStep = getExtractStep(runBackendStepInput); + extractStep.setLimit(PROCESS_OUTPUT_RECORD_LIST_LIMIT); // todo - make this an input? + extractStep.setRecordPipe(recordPipe); - AbstractTransformFunction transformFunction = getTransformFunction(runBackendStepInput); + /////////////////////////////////////////// + // request a count from the extract step // + /////////////////////////////////////////// + Integer recordCount = extractStep.doCount(runBackendStepInput); + runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount); + + AbstractTransformStep transformStep = getTransformStep(runBackendStepInput); List transformedRecordList = new ArrayList<>(); - new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractFunction", IN_MEMORY_RECORD_LIMIT, recordPipe, (status) -> + new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) -> { - extractFunction.run(runBackendStepInput, runBackendStepOutput); + extractStep.run(runBackendStepInput, runBackendStepOutput); return (runBackendStepOutput); }, - () -> (consumeRecordsFromPipe(recordPipe, transformFunction, runBackendStepInput, runBackendStepOutput, transformedRecordList)) + () -> (consumeRecordsFromPipe(recordPipe, transformStep, runBackendStepInput, runBackendStepOutput, transformedRecordList)) ); runBackendStepOutput.setRecords(transformedRecordList); @@ -71,7 +78,7 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe /******************************************************************************* ** *******************************************************************************/ - private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformFunction transformFunction, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List transformedRecordList) throws QException + private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List transformedRecordList) throws QException { /////////////////////////////////// // get the records from the pipe // @@ -81,13 +88,13 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe ///////////////////////////////////////////////////// // pass the records through the transform function // ///////////////////////////////////////////////////// - transformFunction.setInputRecordPage(qRecords); - transformFunction.run(runBackendStepInput, runBackendStepOutput); + transformStep.setInputRecordPage(qRecords); + transformStep.run(runBackendStepInput, runBackendStepOutput); //////////////////////////////////////////////////// // add the transformed records to the output list // //////////////////////////////////////////////////// - transformedRecordList.addAll(transformFunction.getOutputRecordPage()); + transformedRecordList.addAll(transformStep.getOutputRecordPage()); return (qRecords.size()); } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java index 4b4ba456..20d33147 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java @@ -34,6 +34,20 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData; /******************************************************************************* ** Definition for Streamed ETL process that includes a frontend. ** + ** This process uses 2 backend steps, and 2 frontend steps, as follows: + ** - preview (backend) - does just a little work (limited # of rows), to give the + ** user a preview of what the final result will be - e.g., some data to seed the review screen + ** - review (frontend) - a review screen + ** - execute (backend) - processes all the rows, does all the work. + ** - result (frontend) - a result screen + ** + ** The preview & execute steps use additional BackendStep codes: + ** - Extract - gets the rows to be processed. Used in preview (but only for a + ** limited number of rows), and execute (without limit) + ** - Transform - do whatever transformation is needed to the rows. Done on preview + ** and execute. Always works with a "page" of records at a time. + ** - Load - store the records into the backend, as appropriate. Always works + ** with a "page" of records at a time. *******************************************************************************/ public class StreamedETLWithFrontendProcess { @@ -48,24 +62,31 @@ public class StreamedETLWithFrontendProcess public static final String FIELD_TRANSFORM_CODE = "transform"; public static final String FIELD_LOAD_CODE = "load"; - public static final String FIELD_SOURCE_TABLE = "sourceTable"; - public static final String FIELD_DESTINATION_TABLE = "destinationTable"; - public static final String FIELD_MAPPING_JSON = "mappingJSON"; - public static final String FIELD_RECORD_COUNT = "recordCount"; + public static final String FIELD_SOURCE_TABLE = "sourceTable"; + public static final String FIELD_DEFAULT_QUERY_FILTER = "defaultQueryFilter"; + public static final String FIELD_DESTINATION_TABLE = "destinationTable"; /******************************************************************************* ** *******************************************************************************/ - public QProcessMetaData defineProcessMetaData() + public QProcessMetaData defineProcessMetaData( + String sourceTableName, + String destinationTableName, + Class extractStepClass, + Class transformStepClass, + Class loadStepClass + ) { QStepMetaData previewStep = new QBackendStepMetaData() .withName(STEP_NAME_PREVIEW) .withCode(new QCodeReference(StreamedETLPreviewStep.class)) .withInputData(new QFunctionInputMetaData() - .withField(new QFieldMetaData().withName(FIELD_EXTRACT_CODE)) - .withField(new QFieldMetaData().withName(FIELD_TRANSFORM_CODE))); + .withField(new QFieldMetaData().withName(FIELD_SOURCE_TABLE).withDefaultValue(sourceTableName)) + .withField(new QFieldMetaData().withName(FIELD_DEFAULT_QUERY_FILTER)) + .withField(new QFieldMetaData().withName(FIELD_EXTRACT_CODE).withDefaultValue(new QCodeReference(extractStepClass))) + .withField(new QFieldMetaData().withName(FIELD_TRANSFORM_CODE).withDefaultValue(new QCodeReference(transformStepClass)))); QFrontendStepMetaData reviewStep = new QFrontendStepMetaData() .withName(STEP_NAME_REVIEW); @@ -74,7 +95,8 @@ public class StreamedETLWithFrontendProcess .withName(STEP_NAME_EXECUTE) .withCode(new QCodeReference(StreamedETLExecuteStep.class)) .withInputData(new QFunctionInputMetaData() - .withField(new QFieldMetaData().withName(FIELD_LOAD_CODE))); + .withField(new QFieldMetaData().withName(FIELD_DESTINATION_TABLE).withDefaultValue(destinationTableName)) + .withField(new QFieldMetaData().withName(FIELD_LOAD_CODE).withDefaultValue(new QCodeReference(loadStepClass)))); QFrontendStepMetaData resultStep = new QFrontendStepMetaData() .withName(STEP_NAME_RESULT); diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcessTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcessTest.java index 73d8792e..ad63741e 100644 --- a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcessTest.java +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcessTest.java @@ -1,23 +1,22 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; +import java.io.Serializable; import java.util.List; -import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; +import java.util.Map; +import com.kingsrook.qqq.backend.core.actions.processes.QProcessCallback; import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction; -import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; -import com.kingsrook.qqq.backend.core.actions.tables.QueryAction; import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput; -import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; -import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter; import com.kingsrook.qqq.backend.core.model.data.QRecord; import com.kingsrook.qqq.backend.core.model.metadata.QInstance; -import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference; import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData; import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.StreamedETLProcess; import com.kingsrook.qqq.backend.core.utils.TestUtils; import org.junit.jupiter.api.Test; import static org.assertj.core.api.Assertions.assertThat; @@ -32,65 +31,47 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class StreamedETLWithFrontendProcessTest { + /******************************************************************************* ** *******************************************************************************/ @Test - void test() throws QException + void testSimpleSmallQueryTransformInsert() throws QException { - QProcessMetaData process = new StreamedETLWithFrontendProcess().defineProcessMetaData(); - process.setTableName(TestUtils.TABLE_NAME_SHAPE); - - for(QFieldMetaData inputField : process.getInputFields()) - { - if(StreamedETLWithFrontendProcess.FIELD_EXTRACT_CODE.equals(inputField.getName())) - { - inputField.setDefaultValue(new QCodeReference(TestExtractStep.class)); - } - else if(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE.equals(inputField.getName())) - { - inputField.setDefaultValue(new QCodeReference(TestTransformStep.class)); - } - else if(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE.equals(inputField.getName())) - { - inputField.setDefaultValue(new QCodeReference(TestLoadStep.class)); - } - } - QInstance instance = TestUtils.defineInstance(); + + //////////////////////////////////////////////////////// + // define the process - an ELT from Shapes to Persons // + //////////////////////////////////////////////////////// + QProcessMetaData process = new StreamedETLWithFrontendProcess().defineProcessMetaData(TestUtils.TABLE_NAME_SHAPE, TestUtils.TABLE_NAME_PERSON, ExtractViaQueryStep.class, TestTransformStep.class, LoadViaInsertStep.class); + process.setTableName(TestUtils.TABLE_NAME_SHAPE); instance.addProcess(process); - InsertInput insertInput = new InsertInput(instance); - insertInput.setSession(TestUtils.getMockSession()); - insertInput.setTableName(TestUtils.TABLE_NAME_SHAPE); - insertInput.setRecords(List.of( - new QRecord().withTableName(TestUtils.TABLE_NAME_SHAPE).withValue("id", 1).withValue("name", "Circle"), - new QRecord().withTableName(TestUtils.TABLE_NAME_SHAPE).withValue("id", 2).withValue("name", "Triangle"), - new QRecord().withTableName(TestUtils.TABLE_NAME_SHAPE).withValue("id", 3).withValue("name", "Square") - )); - new InsertAction().execute(insertInput); + /////////////////////////////////////////////////////// + // switch the person table to use the memory backend // + /////////////////////////////////////////////////////// + instance.getTable(TestUtils.TABLE_NAME_PERSON).setBackendName(TestUtils.MEMORY_BACKEND_NAME); - List preList = TestUtils.queryTable(TestUtils.TABLE_NAME_SHAPE); + TestUtils.insertDefaultShapes(instance); + ///////////////////// + // run the process // + ///////////////////// RunProcessInput request = new RunProcessInput(instance); request.setSession(TestUtils.getMockSession()); request.setProcessName(process.getName()); request.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); + request.setCallback(new Callback()); RunProcessOutput result = new RunProcessAction().execute(request); assertNotNull(result); - assertTrue(result.getException().isEmpty()); - List postList = TestUtils.queryTable(TestUtils.TABLE_NAME_SHAPE); - assertEquals(6, postList.size()); + List postList = TestUtils.queryTable(instance, TestUtils.TABLE_NAME_PERSON); assertThat(postList) - .anyMatch(qr -> qr.getValue("name").equals("Circle")) - .anyMatch(qr -> qr.getValue("name").equals("Triangle")) - .anyMatch(qr -> qr.getValue("name").equals("Square")) - .anyMatch(qr -> qr.getValue("name").equals("Transformed: Circle")) - .anyMatch(qr -> qr.getValue("name").equals("Transformed: Triangle")) - .anyMatch(qr -> qr.getValue("name").equals("Transformed: Square")); + .as("Should have inserted Circle").anyMatch(qr -> qr.getValue("lastName").equals("Circle")) + .as("Should have inserted Triangle").anyMatch(qr -> qr.getValue("lastName").equals("Triangle")) + .as("Should have inserted Square").anyMatch(qr -> qr.getValue("lastName").equals("Square")); } @@ -98,22 +79,34 @@ class StreamedETLWithFrontendProcessTest /******************************************************************************* ** *******************************************************************************/ - public static class TestExtractStep extends AbstractExtractFunction + @Test + void testBig() throws QException { + QInstance instance = TestUtils.defineInstance(); - /******************************************************************************* - ** Execute the backend step - using the request as input, and the result as output. - ** - *******************************************************************************/ - @Override - public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException - { - QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance()); - queryInput.setSession(runBackendStepInput.getSession()); - queryInput.setTableName(TestUtils.TABLE_NAME_SHAPE); - queryInput.setRecordPipe(getRecordPipe()); - new QueryAction().execute(queryInput); - } + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // define the process - an ELT from Persons to Persons - using the mock backend, and set to do many many records // + /////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + QProcessMetaData process = new StreamedETLWithFrontendProcess().defineProcessMetaData(TestUtils.TABLE_NAME_PERSON, TestUtils.TABLE_NAME_PERSON, ExtractViaQueryWithCustomLimitStep.class, TestTransformStep.class, LoadViaInsertStep.class); + process.setTableName(TestUtils.TABLE_NAME_SHAPE); + instance.addProcess(process); + + ///////////////////// + // run the process // + ///////////////////// + RunProcessInput request = new RunProcessInput(instance); + request.setSession(TestUtils.getMockSession()); + request.setProcessName(process.getName()); + request.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); + request.setCallback(new Callback()); + + RunProcessOutput result = new RunProcessAction().execute(request); + assertNotNull(result); + assertTrue(result.getException().isEmpty()); + + assertEquals(new ExtractViaQueryWithCustomLimitStep().getLimit(), result.getValues().get(StreamedETLProcess.FIELD_RECORD_COUNT)); + + // todo what can we assert? } @@ -121,7 +114,7 @@ class StreamedETLWithFrontendProcessTest /******************************************************************************* ** *******************************************************************************/ - public static class TestTransformStep extends AbstractTransformFunction + public static class TestTransformStep extends AbstractTransformStep { /******************************************************************************* @@ -134,8 +127,8 @@ class StreamedETLWithFrontendProcessTest for(QRecord qRecord : getInputRecordPage()) { QRecord newQRecord = new QRecord(); - newQRecord.setValue("id", null); - newQRecord.setValue("name", "Transformed: " + qRecord.getValueString("name")); + newQRecord.setValue("firstName", "Johnny"); + newQRecord.setValue("lastName", qRecord.getValueString("name")); getOutputRecordPage().add(newQRecord); } } @@ -146,34 +139,41 @@ class StreamedETLWithFrontendProcessTest /******************************************************************************* ** *******************************************************************************/ - public static class TestLoadStep extends AbstractLoadFunction + public static class Callback implements QProcessCallback { - /******************************************************************************* - ** Execute the backend step - using the request as input, and the result as output. - ** + ** Get the filter query for this callback. *******************************************************************************/ @Override - public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + public QQueryFilter getQueryFilter() { - InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance()); - insertInput.setSession(runBackendStepInput.getSession()); - insertInput.setTableName(TestUtils.TABLE_NAME_SHAPE); - insertInput.setTransaction(transaction); - insertInput.setRecords(getInputRecordPage()); - new InsertAction().execute(insertInput); + return (new QQueryFilter()); } /******************************************************************************* - ** + ** Get the field values for this callback. *******************************************************************************/ @Override - protected QBackendTransaction doOpenTransaction(RunBackendStepInput runBackendStepInput) throws QException + public Map getFieldValues(List fields) { - return null; + return (null); } } + + + /******************************************************************************* + ** The Mock backend - its query action will return as many rows as the limit - + ** so let's make sure to give it a big limit. + *******************************************************************************/ + public static class ExtractViaQueryWithCustomLimitStep extends ExtractViaQueryStep + { + @Override + public Integer getLimit() + { + return (10_000); + } + } } \ No newline at end of file diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/utils/TestUtils.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/utils/TestUtils.java index 9058bc4a..9984d2e3 100644 --- a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/utils/TestUtils.java +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/utils/TestUtils.java @@ -440,7 +440,17 @@ public class TestUtils *******************************************************************************/ public static List queryTable(String tableName) throws QException { - QueryInput queryInput = new QueryInput(TestUtils.defineInstance()); + return (queryTable(TestUtils.defineInstance(), tableName)); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public static List queryTable(QInstance instance, String tableName) throws QException + { + QueryInput queryInput = new QueryInput(instance); queryInput.setSession(TestUtils.getMockSession()); queryInput.setTableName(tableName); QueryOutput queryOutput = new QueryAction().execute(queryInput); diff --git a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSUpdateAction.java b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSUpdateAction.java index 559db22c..05026f16 100644 --- a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSUpdateAction.java +++ b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSUpdateAction.java @@ -120,7 +120,7 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte boolean needToCloseConnection = false; if(updateInput.getTransaction() != null && updateInput.getTransaction() instanceof RDBMSTransaction rdbmsTransaction) { - LOG.debug("Using connection from insertInput [" + rdbmsTransaction.getConnection() + "]"); + LOG.debug("Using connection from updateInput [" + rdbmsTransaction.getConnection() + "]"); connection = rdbmsTransaction.getConnection(); } else