QQQ-37 checkpoint

This commit is contained in:
2022-08-23 16:54:36 -05:00
parent 834b4136de
commit c2972cd4df
16 changed files with 527 additions and 251 deletions

View File

@ -12,7 +12,10 @@ import org.apache.logging.log4j.Logger;
/******************************************************************************* /*******************************************************************************
** Class that knows how to Run an asynchronous job (lambda, supplier) that writes into a
** RecordPipe, with another lambda (consumer) that consumes records from the pipe.
** **
** Takes care of the job status monitoring, blocking when the pipe is empty, etc.
*******************************************************************************/ *******************************************************************************/
public class AsyncRecordPipeLoop public class AsyncRecordPipeLoop
{ {
@ -22,20 +25,30 @@ public class AsyncRecordPipeLoop
private static final int MAX_SLEEP_MS = 1000; private static final int MAX_SLEEP_MS = 1000;
private static final int INIT_SLEEP_MS = 10; private static final int INIT_SLEEP_MS = 10;
private static final int MIN_RECORDS_TO_CONSUME = 10;
/******************************************************************************* /*******************************************************************************
** Run an async-record-pipe-loop.
** **
** @param jobName name for the async job thread
** @param recordLimit optionally, cancel the supplier/job after this number of records.
* e.g., for a preview step.
** @param recordPipe constructed before this call, and used in both of the lambdas
** @param supplier lambda that adds records into the pipe.
* e.g., a query or extract step.
** @param consumer lambda that consumes records from the pipe
* e.g., a transform/load step.
*******************************************************************************/ *******************************************************************************/
public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, UnsafeFunction<AsyncJobCallback, ? extends Serializable> job, UnsafeSupplier<Integer> consumer) throws QException public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, UnsafeFunction<AsyncJobCallback, ? extends Serializable> supplier, UnsafeSupplier<Integer> consumer) throws QException
{ {
/////////////////////////////////////////////////// ///////////////////////////////////////////////////
// start the extraction function as an async job // // start the extraction function as an async job //
/////////////////////////////////////////////////// ///////////////////////////////////////////////////
AsyncJobManager asyncJobManager = new AsyncJobManager(); AsyncJobManager asyncJobManager = new AsyncJobManager();
String jobUUID = asyncJobManager.startJob(jobName, job::apply); String jobUUID = asyncJobManager.startJob(jobName, supplier::apply);
LOG.info("Started job [" + jobUUID + "] for record pipe streaming"); LOG.info("Started supplier job [" + jobUUID + "] for record pipe.");
AsyncJobState jobState = AsyncJobState.RUNNING; AsyncJobState jobState = AsyncJobState.RUNNING;
AsyncJobStatus asyncJobStatus = null; AsyncJobStatus asyncJobStatus = null;
@ -47,13 +60,13 @@ public class AsyncRecordPipeLoop
while(jobState.equals(AsyncJobState.RUNNING)) while(jobState.equals(AsyncJobState.RUNNING))
{ {
if(recordPipe.countAvailableRecords() == 0) if(recordPipe.countAvailableRecords() < MIN_RECORDS_TO_CONSUME)
{ {
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////
// if the pipe is empty, sleep to let the producer work. // // if the pipe is too empty, sleep to let the producer work. //
// todo - smarter sleep? like get notified vs. sleep? // // todo - smarter sleep? like get notified vs. sleep? //
/////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////
LOG.info("No records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work"); LOG.debug("Too few records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work");
SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS); SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS);
nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS); nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS);
@ -114,9 +127,9 @@ public class AsyncRecordPipeLoop
throw (new QException("Job failed with an error", asyncJobStatus.getCaughtException())); throw (new QException("Job failed with an error", asyncJobStatus.getCaughtException()));
} }
////////////////////////////////////////////////////// ////////////////////////////////////////////
// send the final records to transform & load steps // // send the final records to the consumer //
////////////////////////////////////////////////////// ////////////////////////////////////////////
recordCount += consumer.get(); recordCount += consumer.get();
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();

View File

@ -39,10 +39,16 @@ public interface QProcessCallback
/******************************************************************************* /*******************************************************************************
** Get the filter query for this callback. ** Get the filter query for this callback.
*******************************************************************************/ *******************************************************************************/
QQueryFilter getQueryFilter(); default QQueryFilter getQueryFilter()
{
return (null);
}
/******************************************************************************* /*******************************************************************************
** Get the field values for this callback. ** Get the field values for this callback.
*******************************************************************************/ *******************************************************************************/
Map<String, Serializable> getFieldValues(List<QFieldMetaData> fields); default Map<String, Serializable> getFieldValues(List<QFieldMetaData> fields)
{
return (null);
}
} }

View File

@ -23,12 +23,8 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed;
import java.util.List; import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.async.AsyncJobManager; import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop;
import com.kingsrook.qqq.backend.core.actions.async.AsyncJobState;
import com.kingsrook.qqq.backend.core.actions.async.AsyncJobStatus;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe; import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
@ -41,7 +37,6 @@ import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicE
import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLLoadFunction; import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLLoadFunction;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLProcess; import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLProcess;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLTransformFunction; import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLTransformFunction;
import com.kingsrook.qqq.backend.core.utils.SleepUtils;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -64,6 +59,7 @@ public class StreamedETLBackendStep implements BackendStep
** **
*******************************************************************************/ *******************************************************************************/
@Override @Override
@SuppressWarnings("checkstyle:indentation")
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{ {
QBackendTransaction transaction = openTransaction(runBackendStepInput); QBackendTransaction transaction = openTransaction(runBackendStepInput);
@ -74,96 +70,23 @@ public class StreamedETLBackendStep implements BackendStep
BasicETLExtractFunction basicETLExtractFunction = new BasicETLExtractFunction(); BasicETLExtractFunction basicETLExtractFunction = new BasicETLExtractFunction();
basicETLExtractFunction.setRecordPipe(recordPipe); basicETLExtractFunction.setRecordPipe(recordPipe);
////////////////////////////////////////// ////////////////////////////////////
// run the query action as an async job // // run the async-record-pipe loop //
////////////////////////////////////////// ////////////////////////////////////
AsyncJobManager asyncJobManager = new AsyncJobManager(); int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Extract", null, recordPipe, (status) ->
String queryJobUUID = asyncJobManager.startJob("StreamedETL>QueryAction", (status) ->
{ {
basicETLExtractFunction.run(runBackendStepInput, runBackendStepOutput); basicETLExtractFunction.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput); return (runBackendStepOutput);
}); },
LOG.info("Started query job [" + queryJobUUID + "] for streamed ETL"); () -> (consumeRecordsFromPipe(recordPipe, runBackendStepInput, runBackendStepOutput, transaction))
);
AsyncJobState queryJobState = AsyncJobState.RUNNING; runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount);
AsyncJobStatus asyncJobStatus = null;
long recordCount = 0;
int nextSleepMillis = INIT_SLEEP_MS;
long lastReceivedRecordsAt = System.currentTimeMillis();
long jobStartTime = System.currentTimeMillis();
while(queryJobState.equals(AsyncJobState.RUNNING))
{
if(recordPipe.countAvailableRecords() == 0)
{
///////////////////////////////////////////////////////////
// if the pipe is empty, sleep to let the producer work. //
// todo - smarter sleep? like get notified vs. sleep? //
///////////////////////////////////////////////////////////
LOG.info("No records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work");
SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS);
nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS);
long timeSinceLastReceivedRecord = System.currentTimeMillis() - lastReceivedRecordsAt;
if(timeSinceLastReceivedRecord > TIMEOUT_AFTER_NO_RECORDS_MS)
{
throw (new QException("Query action appears to have stopped producing records (last record received " + timeSinceLastReceivedRecord + " ms ago)."));
}
}
else
{
////////////////////////////////////////////////////////////////////////////////////////////////////////
// if the pipe has records, consume them. reset the sleep timer so if we sleep again it'll be short. //
////////////////////////////////////////////////////////////////////////////////////////////////////////
lastReceivedRecordsAt = System.currentTimeMillis();
nextSleepMillis = INIT_SLEEP_MS;
recordCount += consumeRecordsFromPipe(recordPipe, runBackendStepInput, runBackendStepOutput, transaction);
LOG.info(String.format("Processed %,d records so far", recordCount));
}
////////////////////////////////////
// refresh the query job's status //
////////////////////////////////////
Optional<AsyncJobStatus> optionalAsyncJobStatus = asyncJobManager.getJobStatus(queryJobUUID);
if(optionalAsyncJobStatus.isEmpty())
{
/////////////////////////////////////////////////
// todo - ... maybe some version of try-again? //
/////////////////////////////////////////////////
throw (new QException("Could not get status of report query job [" + queryJobUUID + "]"));
}
asyncJobStatus = optionalAsyncJobStatus.get();
queryJobState = asyncJobStatus.getState();
}
LOG.info("Query job [" + queryJobUUID + "] for ETL completed with status: " + asyncJobStatus);
/////////////////////////////////////////
// propagate errors from the query job //
/////////////////////////////////////////
if(asyncJobStatus.getState().equals(AsyncJobState.ERROR))
{
throw (new QException("Query job failed with an error", asyncJobStatus.getCaughtException()));
}
//////////////////////////////////////////////////////
// send the final records to transform & load steps //
//////////////////////////////////////////////////////
recordCount += consumeRecordsFromPipe(recordPipe, runBackendStepInput, runBackendStepOutput, transaction);
///////////////////// /////////////////////
// commit the work // // commit the work //
///////////////////// /////////////////////
transaction.commit(); transaction.commit();
long reportEndTime = System.currentTimeMillis();
LOG.info(String.format("Processed %,d records", recordCount)
+ String.format(" at end of ETL job in %,d ms (%.2f records/second).", (reportEndTime - jobStartTime), 1000d * (recordCount / (.001d + (reportEndTime - jobStartTime)))));
runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount);
} }
catch(Exception e) catch(Exception e)
{ {

View File

@ -3,18 +3,41 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe; import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
/******************************************************************************* /*******************************************************************************
** Base class for the Extract logic of Streamed ETL processes.
** **
** These steps are invoked by both the "preview" and the "execute" steps of a
** StreamedETLWithFrontend process.
**
** Key here, is that subclasses here should put records that they're "Extracting"
** into the recordPipe member. That is to say, DO NOT use the recordList in
** the Step input/output objects.
**
** Ideally, they'll also stop once they've hit the "limit" number of records
** (though if you keep going, the pipe will get terminated and the job will be
** cancelled, etc...).
*******************************************************************************/ *******************************************************************************/
public abstract class AbstractExtractFunction implements BackendStep public abstract class AbstractExtractStep implements BackendStep
{ {
private RecordPipe recordPipe; private RecordPipe recordPipe;
private Integer limit; private Integer limit;
/*******************************************************************************
**
*******************************************************************************/
public Integer doCount(RunBackendStepInput runBackendStepInput) throws QException
{
return (null);
}
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/

View File

@ -3,6 +3,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.exceptions.QException;
@ -11,35 +12,33 @@ import com.kingsrook.qqq.backend.core.model.data.QRecord;
/******************************************************************************* /*******************************************************************************
** Base class for the Load (aka, store) logic of Streamed ETL processes.
** **
** Records are to be read out of the inputRecordPage field, and after storing,
** should be written to the outputRecordPage. That is to say, DO NOT use the
** recordList in the step input/output objects.
**
** Also - use the transaction member variable - though be aware, it
*******************************************************************************/ *******************************************************************************/
public abstract class AbstractLoadFunction implements BackendStep public abstract class AbstractLoadStep implements BackendStep
{ {
private List<QRecord> inputRecordPage = new ArrayList<>(); private List<QRecord> inputRecordPage = new ArrayList<>();
private List<QRecord> outputRecordPage = new ArrayList<>(); private List<QRecord> outputRecordPage = new ArrayList<>();
protected QBackendTransaction transaction; private Optional<QBackendTransaction> transaction = Optional.empty();
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
public QBackendTransaction openTransaction(RunBackendStepInput runBackendStepInput) throws QException public Optional<QBackendTransaction> openTransaction(RunBackendStepInput runBackendStepInput) throws QException
{ {
this.transaction = doOpenTransaction(runBackendStepInput); return (Optional.empty());
return (transaction);
} }
/*******************************************************************************
**
*******************************************************************************/
protected abstract QBackendTransaction doOpenTransaction(RunBackendStepInput runBackendStepInput) throws QException;
/******************************************************************************* /*******************************************************************************
** Getter for recordPage ** Getter for recordPage
** **
@ -82,4 +81,25 @@ public abstract class AbstractLoadFunction implements BackendStep
this.outputRecordPage = outputRecordPage; this.outputRecordPage = outputRecordPage;
} }
/*******************************************************************************
** Setter for transaction
**
*******************************************************************************/
public void setTransaction(Optional<QBackendTransaction> transaction)
{
this.transaction = transaction;
}
/*******************************************************************************
** Getter for transaction
**
*******************************************************************************/
public Optional<QBackendTransaction> getTransaction()
{
return (transaction);
}
} }

View File

@ -8,9 +8,13 @@ import com.kingsrook.qqq.backend.core.model.data.QRecord;
/******************************************************************************* /*******************************************************************************
** Base class for the Transform logic of Streamed ETL processes.
** **
** Records are to be read out of the inputRecordPage field, and after transformation,
** should be written to the outputRecordPage. That is to say, DO NOT use the
** recordList in the step input/output objects.
*******************************************************************************/ *******************************************************************************/
public abstract class AbstractTransformFunction implements BackendStep public abstract class AbstractTransformStep implements BackendStep
{ {
private List<QRecord> inputRecordPage = new ArrayList<>(); private List<QRecord> inputRecordPage = new ArrayList<>();
private List<QRecord> outputRecordPage = new ArrayList<>(); private List<QRecord> outputRecordPage = new ArrayList<>();

View File

@ -11,17 +11,17 @@ import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
*******************************************************************************/ *******************************************************************************/
public class BaseStreamedETLStep public class BaseStreamedETLStep
{ {
protected static final int IN_MEMORY_RECORD_LIMIT = 20; protected static final int PROCESS_OUTPUT_RECORD_LIST_LIMIT = 20;
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
protected AbstractExtractFunction getExtractFunction(RunBackendStepInput runBackendStepInput) protected AbstractExtractStep getExtractStep(RunBackendStepInput runBackendStepInput)
{ {
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_EXTRACT_CODE); QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_EXTRACT_CODE);
return (QCodeLoader.getBackendStep(AbstractExtractFunction.class, codeReference)); return (QCodeLoader.getBackendStep(AbstractExtractStep.class, codeReference));
} }
@ -29,10 +29,10 @@ public class BaseStreamedETLStep
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
protected AbstractTransformFunction getTransformFunction(RunBackendStepInput runBackendStepInput) protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput)
{ {
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE); QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE);
return (QCodeLoader.getBackendStep(AbstractTransformFunction.class, codeReference)); return (QCodeLoader.getBackendStep(AbstractTransformStep.class, codeReference));
} }
@ -40,10 +40,10 @@ public class BaseStreamedETLStep
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
protected AbstractLoadFunction getLoadFunction(RunBackendStepInput runBackendStepInput) protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput)
{ {
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE); QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE);
return (QCodeLoader.getBackendStep(AbstractLoadFunction.class, codeReference)); return (QCodeLoader.getBackendStep(AbstractLoadStep.class, codeReference));
} }
} }

View File

@ -0,0 +1,125 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.io.IOException;
import java.io.Serializable;
import com.kingsrook.qqq.backend.core.actions.tables.CountAction;
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.utils.JsonUtils;
/*******************************************************************************
** Generic implementation of an ExtractStep - that runs a Query action for a
** specified table.
**
** If a query is specified from the caller (e.g., using the process Callback
** mechanism), that will be used. Else a filter (object or json) in
** StreamedETLWithFrontendProcess.FIELD_DEFAULT_QUERY_FILTER will be checked.
*******************************************************************************/
public class ExtractViaQueryStep extends AbstractExtractStep
{
public static final String FIELD_SOURCE_TABLE = "sourceTable";
/*******************************************************************************
** Execute the backend step - using the request as input, and the result as output.
**
*******************************************************************************/
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance());
queryInput.setSession(runBackendStepInput.getSession());
queryInput.setTableName(runBackendStepInput.getValueString(FIELD_SOURCE_TABLE));
queryInput.setFilter(getQueryFilter(runBackendStepInput));
queryInput.setRecordPipe(getRecordPipe());
queryInput.setLimit(getLimit());
new QueryAction().execute(queryInput);
///////////////////////////////////////////////////////////////////
// output is done into the pipe - so, nothing for us to do here. //
///////////////////////////////////////////////////////////////////
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public Integer doCount(RunBackendStepInput runBackendStepInput) throws QException
{
CountInput countInput = new CountInput(runBackendStepInput.getInstance());
countInput.setSession(runBackendStepInput.getSession());
countInput.setTableName(runBackendStepInput.getValueString(FIELD_SOURCE_TABLE));
countInput.setFilter(getQueryFilter(runBackendStepInput));
CountOutput countOutput = new CountAction().execute(countInput);
return (countOutput.getCount());
}
/*******************************************************************************
**
*******************************************************************************/
protected QQueryFilter getQueryFilter(RunBackendStepInput runBackendStepInput) throws QException
{
//////////////////////////////////////////////////////////////////////////////////////
// if the queryFilterJson field is populated, read the filter from it and return it //
//////////////////////////////////////////////////////////////////////////////////////
String queryFilterJson = runBackendStepInput.getValueString("queryFilterJson");
if(queryFilterJson != null)
{
try
{
return (JsonUtils.toObject(queryFilterJson, QQueryFilter.class));
}
catch(IOException e)
{
throw new QException("Error loading query filter from json field", e);
}
}
else if(runBackendStepInput.getCallback() != null && runBackendStepInput.getCallback().getQueryFilter() != null)
{
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// else, try to get filter from process callback. if we got one, store it as a process value for later steps //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
QQueryFilter queryFilter = runBackendStepInput.getCallback().getQueryFilter();
runBackendStepInput.addValue("queryFilterJson", JsonUtils.toJson(queryFilter));
return (queryFilter);
}
else
{
/////////////////////////////////////////////////////
// else, see if a defaultQueryFilter was specified //
/////////////////////////////////////////////////////
Serializable defaultQueryFilter = runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_DEFAULT_QUERY_FILTER);
if(defaultQueryFilter instanceof QQueryFilter filter)
{
return (filter);
}
if(defaultQueryFilter instanceof String string)
{
try
{
return (JsonUtils.toObject(string, QQueryFilter.class));
}
catch(IOException e)
{
throw new QException("Error loading default query filter from json", e);
}
}
}
throw (new QException("Could not find query filter for Extract step."));
}
}

View File

@ -0,0 +1,54 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.Optional;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertOutput;
/*******************************************************************************
** Generic implementation of a LoadStep - that runs an Insert action for a
** specified table.
*******************************************************************************/
public class LoadViaInsertStep extends AbstractLoadStep
{
public static final String FIELD_DESTINATION_TABLE = "destinationTable";
/*******************************************************************************
** Execute the backend step - using the request as input, and the result as output.
**
*******************************************************************************/
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance());
insertInput.setSession(runBackendStepInput.getSession());
insertInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE));
insertInput.setRecords(getInputRecordPage());
getTransaction().ifPresent(insertInput::setTransaction);
InsertOutput insertOutput = new InsertAction().execute(insertInput);
getOutputRecordPage().addAll(insertOutput.getRecords());
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public Optional<QBackendTransaction> openTransaction(RunBackendStepInput runBackendStepInput) throws QException
{
InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance());
insertInput.setSession(runBackendStepInput.getSession());
insertInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE));
return (Optional.of(new InsertAction().openTransaction(insertInput)));
}
}

View File

@ -0,0 +1,56 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.Optional;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateOutput;
/*******************************************************************************
** Generic implementation of a LoadStep - that runs an Update action for a
** specified table.
*******************************************************************************/
public class LoadViaUpdateStep extends AbstractLoadStep
{
public static final String FIELD_DESTINATION_TABLE = "destinationTable";
/*******************************************************************************
** Execute the backend step - using the request as input, and the result as output.
**
*******************************************************************************/
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
UpdateInput updateInput = new UpdateInput(runBackendStepInput.getInstance());
updateInput.setSession(runBackendStepInput.getSession());
updateInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE));
updateInput.setRecords(getInputRecordPage());
getTransaction().ifPresent(updateInput::setTransaction);
UpdateOutput updateOutput = new UpdateAction().execute(updateInput);
getOutputRecordPage().addAll(updateOutput.getRecords());
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public Optional<QBackendTransaction> openTransaction(RunBackendStepInput runBackendStepInput) throws QException
{
InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance());
insertInput.setSession(runBackendStepInput.getSession());
insertInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE));
return (Optional.of(new InsertAction().openTransaction(insertInput)));
}
}

View File

@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Optional;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop; import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep; import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
@ -36,10 +37,14 @@ import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.Str
/******************************************************************************* /*******************************************************************************
** Backend step to do a execute a streamed ETL job ** Backend step to do the execute portion of a streamed ETL job.
**
** Works within a transaction (per the backend module of the destination table).
*******************************************************************************/ *******************************************************************************/
public class StreamedETLExecuteStep extends BaseStreamedETLStep implements BackendStep public class StreamedETLExecuteStep extends BaseStreamedETLStep implements BackendStep
{ {
private int currentRowCount = 1;
/******************************************************************************* /*******************************************************************************
@ -49,7 +54,7 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
@SuppressWarnings("checkstyle:indentation") @SuppressWarnings("checkstyle:indentation")
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{ {
QBackendTransaction transaction = null; Optional<QBackendTransaction> transaction = Optional.empty();
try try
{ {
@ -57,21 +62,22 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
// set up the extract, transform, and load functions // // set up the extract, transform, and load functions //
/////////////////////////////////////////////////////// ///////////////////////////////////////////////////////
RecordPipe recordPipe = new RecordPipe(); RecordPipe recordPipe = new RecordPipe();
AbstractExtractFunction extractFunction = getExtractFunction(runBackendStepInput); AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
extractFunction.setRecordPipe(recordPipe); extractStep.setRecordPipe(recordPipe);
AbstractTransformFunction transformFunction = getTransformFunction(runBackendStepInput); AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
AbstractLoadFunction loadFunction = getLoadFunction(runBackendStepInput); AbstractLoadStep loadStep = getLoadStep(runBackendStepInput);
transaction = loadFunction.openTransaction(runBackendStepInput); transaction = loadStep.openTransaction(runBackendStepInput);
loadStep.setTransaction(transaction);
List<QRecord> loadedRecordList = new ArrayList<>(); List<QRecord> loadedRecordList = new ArrayList<>();
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractFunction", null, recordPipe, (status) -> int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) ->
{ {
extractFunction.run(runBackendStepInput, runBackendStepOutput); extractStep.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput); return (runBackendStepOutput);
}, },
() -> (consumeRecordsFromPipe(recordPipe, transformFunction, loadFunction, runBackendStepInput, runBackendStepOutput, loadedRecordList)) () -> (consumeRecordsFromPipe(recordPipe, transformStep, loadStep, runBackendStepInput, runBackendStepOutput, loadedRecordList))
); );
runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount); runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount);
@ -80,9 +86,9 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
///////////////////// /////////////////////
// commit the work // // commit the work //
///////////////////// /////////////////////
if(transaction != null) if(transaction.isPresent())
{ {
transaction.commit(); transaction.get().commit();
} }
} }
catch(Exception e) catch(Exception e)
@ -90,9 +96,9 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
// rollback the work, then re-throw the error for up-stream to catch & report // // rollback the work, then re-throw the error for up-stream to catch & report //
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
if(transaction != null) if(transaction.isPresent())
{ {
transaction.rollback(); transaction.get().rollback();
} }
throw (e); throw (e);
} }
@ -101,9 +107,9 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
// always close our transactions (e.g., jdbc connections) // // always close our transactions (e.g., jdbc connections) //
//////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////
if(transaction != null) if(transaction.isPresent())
{ {
transaction.close(); transaction.get().close();
} }
} }
} }
@ -113,8 +119,14 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformFunction transformFunction, AbstractLoadFunction loadFunction, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> loadedRecordList) throws QException private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, AbstractLoadStep loadStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> loadedRecordList) throws QException
{ {
Integer totalRows = runBackendStepInput.getValueInteger(StreamedETLProcess.FIELD_RECORD_COUNT);
if(totalRows != null)
{
runBackendStepInput.getAsyncJobCallback().updateStatus(currentRowCount, totalRows);
}
/////////////////////////////////// ///////////////////////////////////
// get the records from the pipe // // get the records from the pipe //
/////////////////////////////////// ///////////////////////////////////
@ -123,26 +135,27 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
// pass the records through the transform function // // pass the records through the transform function //
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
transformFunction.setInputRecordPage(qRecords); transformStep.setInputRecordPage(qRecords);
transformFunction.setOutputRecordPage(new ArrayList<>()); transformStep.setOutputRecordPage(new ArrayList<>());
transformFunction.run(runBackendStepInput, runBackendStepOutput); transformStep.run(runBackendStepInput, runBackendStepOutput);
//////////////////////////////////////////////// ////////////////////////////////////////////////
// pass the records through the load function // // pass the records through the load function //
//////////////////////////////////////////////// ////////////////////////////////////////////////
loadFunction.setInputRecordPage(transformFunction.getOutputRecordPage()); loadStep.setInputRecordPage(transformStep.getOutputRecordPage());
loadFunction.setOutputRecordPage(new ArrayList<>()); loadStep.setOutputRecordPage(new ArrayList<>());
loadFunction.run(runBackendStepInput, runBackendStepOutput); loadStep.run(runBackendStepInput, runBackendStepOutput);
/////////////////////////////////////////////////////// ///////////////////////////////////////////////////////
// copy a small number of records to the output list // // copy a small number of records to the output list //
/////////////////////////////////////////////////////// ///////////////////////////////////////////////////////
int i = 0; int i = 0;
while(loadedRecordList.size() < IN_MEMORY_RECORD_LIMIT && i < loadFunction.getOutputRecordPage().size()) while(loadedRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < loadStep.getOutputRecordPage().size())
{ {
loadedRecordList.add(loadFunction.getOutputRecordPage().get(i++)); loadedRecordList.add(loadStep.getOutputRecordPage().get(i++));
} }
currentRowCount += qRecords.size();
return (qRecords.size()); return (qRecords.size());
} }

View File

@ -31,6 +31,7 @@ import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord; import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.StreamedETLProcess;
/******************************************************************************* /*******************************************************************************
@ -48,19 +49,25 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{ {
RecordPipe recordPipe = new RecordPipe(); RecordPipe recordPipe = new RecordPipe();
AbstractExtractFunction extractFunction = getExtractFunction(runBackendStepInput); AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
extractFunction.setLimit(IN_MEMORY_RECORD_LIMIT); // todo - process field? extractStep.setLimit(PROCESS_OUTPUT_RECORD_LIST_LIMIT); // todo - make this an input?
extractFunction.setRecordPipe(recordPipe); extractStep.setRecordPipe(recordPipe);
AbstractTransformFunction transformFunction = getTransformFunction(runBackendStepInput); ///////////////////////////////////////////
// request a count from the extract step //
///////////////////////////////////////////
Integer recordCount = extractStep.doCount(runBackendStepInput);
runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
List<QRecord> transformedRecordList = new ArrayList<>(); List<QRecord> transformedRecordList = new ArrayList<>();
new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractFunction", IN_MEMORY_RECORD_LIMIT, recordPipe, (status) -> new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractStep", PROCESS_OUTPUT_RECORD_LIST_LIMIT, recordPipe, (status) ->
{ {
extractFunction.run(runBackendStepInput, runBackendStepOutput); extractStep.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput); return (runBackendStepOutput);
}, },
() -> (consumeRecordsFromPipe(recordPipe, transformFunction, runBackendStepInput, runBackendStepOutput, transformedRecordList)) () -> (consumeRecordsFromPipe(recordPipe, transformStep, runBackendStepInput, runBackendStepOutput, transformedRecordList))
); );
runBackendStepOutput.setRecords(transformedRecordList); runBackendStepOutput.setRecords(transformedRecordList);
@ -71,7 +78,7 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformFunction transformFunction, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> transformedRecordList) throws QException private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformStep transformStep, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> transformedRecordList) throws QException
{ {
/////////////////////////////////// ///////////////////////////////////
// get the records from the pipe // // get the records from the pipe //
@ -81,13 +88,13 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
// pass the records through the transform function // // pass the records through the transform function //
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
transformFunction.setInputRecordPage(qRecords); transformStep.setInputRecordPage(qRecords);
transformFunction.run(runBackendStepInput, runBackendStepOutput); transformStep.run(runBackendStepInput, runBackendStepOutput);
//////////////////////////////////////////////////// ////////////////////////////////////////////////////
// add the transformed records to the output list // // add the transformed records to the output list //
//////////////////////////////////////////////////// ////////////////////////////////////////////////////
transformedRecordList.addAll(transformFunction.getOutputRecordPage()); transformedRecordList.addAll(transformStep.getOutputRecordPage());
return (qRecords.size()); return (qRecords.size());
} }

View File

@ -34,6 +34,20 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData;
/******************************************************************************* /*******************************************************************************
** Definition for Streamed ETL process that includes a frontend. ** Definition for Streamed ETL process that includes a frontend.
** **
** This process uses 2 backend steps, and 2 frontend steps, as follows:
** - preview (backend) - does just a little work (limited # of rows), to give the
** user a preview of what the final result will be - e.g., some data to seed the review screen
** - review (frontend) - a review screen
** - execute (backend) - processes all the rows, does all the work.
** - result (frontend) - a result screen
**
** The preview & execute steps use additional BackendStep codes:
** - Extract - gets the rows to be processed. Used in preview (but only for a
** limited number of rows), and execute (without limit)
** - Transform - do whatever transformation is needed to the rows. Done on preview
** and execute. Always works with a "page" of records at a time.
** - Load - store the records into the backend, as appropriate. Always works
** with a "page" of records at a time.
*******************************************************************************/ *******************************************************************************/
public class StreamedETLWithFrontendProcess public class StreamedETLWithFrontendProcess
{ {
@ -49,23 +63,30 @@ public class StreamedETLWithFrontendProcess
public static final String FIELD_LOAD_CODE = "load"; public static final String FIELD_LOAD_CODE = "load";
public static final String FIELD_SOURCE_TABLE = "sourceTable"; public static final String FIELD_SOURCE_TABLE = "sourceTable";
public static final String FIELD_DEFAULT_QUERY_FILTER = "defaultQueryFilter";
public static final String FIELD_DESTINATION_TABLE = "destinationTable"; public static final String FIELD_DESTINATION_TABLE = "destinationTable";
public static final String FIELD_MAPPING_JSON = "mappingJSON";
public static final String FIELD_RECORD_COUNT = "recordCount";
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
public QProcessMetaData defineProcessMetaData() public QProcessMetaData defineProcessMetaData(
String sourceTableName,
String destinationTableName,
Class<? extends AbstractExtractStep> extractStepClass,
Class<? extends AbstractTransformStep> transformStepClass,
Class<? extends AbstractLoadStep> loadStepClass
)
{ {
QStepMetaData previewStep = new QBackendStepMetaData() QStepMetaData previewStep = new QBackendStepMetaData()
.withName(STEP_NAME_PREVIEW) .withName(STEP_NAME_PREVIEW)
.withCode(new QCodeReference(StreamedETLPreviewStep.class)) .withCode(new QCodeReference(StreamedETLPreviewStep.class))
.withInputData(new QFunctionInputMetaData() .withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData().withName(FIELD_EXTRACT_CODE)) .withField(new QFieldMetaData().withName(FIELD_SOURCE_TABLE).withDefaultValue(sourceTableName))
.withField(new QFieldMetaData().withName(FIELD_TRANSFORM_CODE))); .withField(new QFieldMetaData().withName(FIELD_DEFAULT_QUERY_FILTER))
.withField(new QFieldMetaData().withName(FIELD_EXTRACT_CODE).withDefaultValue(new QCodeReference(extractStepClass)))
.withField(new QFieldMetaData().withName(FIELD_TRANSFORM_CODE).withDefaultValue(new QCodeReference(transformStepClass))));
QFrontendStepMetaData reviewStep = new QFrontendStepMetaData() QFrontendStepMetaData reviewStep = new QFrontendStepMetaData()
.withName(STEP_NAME_REVIEW); .withName(STEP_NAME_REVIEW);
@ -74,7 +95,8 @@ public class StreamedETLWithFrontendProcess
.withName(STEP_NAME_EXECUTE) .withName(STEP_NAME_EXECUTE)
.withCode(new QCodeReference(StreamedETLExecuteStep.class)) .withCode(new QCodeReference(StreamedETLExecuteStep.class))
.withInputData(new QFunctionInputMetaData() .withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData().withName(FIELD_LOAD_CODE))); .withField(new QFieldMetaData().withName(FIELD_DESTINATION_TABLE).withDefaultValue(destinationTableName))
.withField(new QFieldMetaData().withName(FIELD_LOAD_CODE).withDefaultValue(new QCodeReference(loadStepClass))));
QFrontendStepMetaData resultStep = new QFrontendStepMetaData() QFrontendStepMetaData resultStep = new QFrontendStepMetaData()
.withName(STEP_NAME_RESULT); .withName(STEP_NAME_RESULT);

View File

@ -1,23 +1,22 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend; package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.io.Serializable;
import java.util.List; import java.util.List;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction; import java.util.Map;
import com.kingsrook.qqq.backend.core.actions.processes.QProcessCallback;
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction; import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.model.data.QRecord; import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance; import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData; import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData; import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.StreamedETLProcess;
import com.kingsrook.qqq.backend.core.utils.TestUtils; import com.kingsrook.qqq.backend.core.utils.TestUtils;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -32,65 +31,47 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class StreamedETLWithFrontendProcessTest class StreamedETLWithFrontendProcessTest
{ {
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
@Test @Test
void test() throws QException void testSimpleSmallQueryTransformInsert() throws QException
{ {
QProcessMetaData process = new StreamedETLWithFrontendProcess().defineProcessMetaData();
process.setTableName(TestUtils.TABLE_NAME_SHAPE);
for(QFieldMetaData inputField : process.getInputFields())
{
if(StreamedETLWithFrontendProcess.FIELD_EXTRACT_CODE.equals(inputField.getName()))
{
inputField.setDefaultValue(new QCodeReference(TestExtractStep.class));
}
else if(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE.equals(inputField.getName()))
{
inputField.setDefaultValue(new QCodeReference(TestTransformStep.class));
}
else if(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE.equals(inputField.getName()))
{
inputField.setDefaultValue(new QCodeReference(TestLoadStep.class));
}
}
QInstance instance = TestUtils.defineInstance(); QInstance instance = TestUtils.defineInstance();
////////////////////////////////////////////////////////
// define the process - an ELT from Shapes to Persons //
////////////////////////////////////////////////////////
QProcessMetaData process = new StreamedETLWithFrontendProcess().defineProcessMetaData(TestUtils.TABLE_NAME_SHAPE, TestUtils.TABLE_NAME_PERSON, ExtractViaQueryStep.class, TestTransformStep.class, LoadViaInsertStep.class);
process.setTableName(TestUtils.TABLE_NAME_SHAPE);
instance.addProcess(process); instance.addProcess(process);
InsertInput insertInput = new InsertInput(instance); ///////////////////////////////////////////////////////
insertInput.setSession(TestUtils.getMockSession()); // switch the person table to use the memory backend //
insertInput.setTableName(TestUtils.TABLE_NAME_SHAPE); ///////////////////////////////////////////////////////
insertInput.setRecords(List.of( instance.getTable(TestUtils.TABLE_NAME_PERSON).setBackendName(TestUtils.MEMORY_BACKEND_NAME);
new QRecord().withTableName(TestUtils.TABLE_NAME_SHAPE).withValue("id", 1).withValue("name", "Circle"),
new QRecord().withTableName(TestUtils.TABLE_NAME_SHAPE).withValue("id", 2).withValue("name", "Triangle"),
new QRecord().withTableName(TestUtils.TABLE_NAME_SHAPE).withValue("id", 3).withValue("name", "Square")
));
new InsertAction().execute(insertInput);
List<QRecord> preList = TestUtils.queryTable(TestUtils.TABLE_NAME_SHAPE); TestUtils.insertDefaultShapes(instance);
/////////////////////
// run the process //
/////////////////////
RunProcessInput request = new RunProcessInput(instance); RunProcessInput request = new RunProcessInput(instance);
request.setSession(TestUtils.getMockSession()); request.setSession(TestUtils.getMockSession());
request.setProcessName(process.getName()); request.setProcessName(process.getName());
request.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); request.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
request.setCallback(new Callback());
RunProcessOutput result = new RunProcessAction().execute(request); RunProcessOutput result = new RunProcessAction().execute(request);
assertNotNull(result); assertNotNull(result);
assertTrue(result.getException().isEmpty()); assertTrue(result.getException().isEmpty());
List<QRecord> postList = TestUtils.queryTable(TestUtils.TABLE_NAME_SHAPE); List<QRecord> postList = TestUtils.queryTable(instance, TestUtils.TABLE_NAME_PERSON);
assertEquals(6, postList.size());
assertThat(postList) assertThat(postList)
.anyMatch(qr -> qr.getValue("name").equals("Circle")) .as("Should have inserted Circle").anyMatch(qr -> qr.getValue("lastName").equals("Circle"))
.anyMatch(qr -> qr.getValue("name").equals("Triangle")) .as("Should have inserted Triangle").anyMatch(qr -> qr.getValue("lastName").equals("Triangle"))
.anyMatch(qr -> qr.getValue("name").equals("Square")) .as("Should have inserted Square").anyMatch(qr -> qr.getValue("lastName").equals("Square"));
.anyMatch(qr -> qr.getValue("name").equals("Transformed: Circle"))
.anyMatch(qr -> qr.getValue("name").equals("Transformed: Triangle"))
.anyMatch(qr -> qr.getValue("name").equals("Transformed: Square"));
} }
@ -98,22 +79,34 @@ class StreamedETLWithFrontendProcessTest
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
public static class TestExtractStep extends AbstractExtractFunction @Test
void testBig() throws QException
{ {
QInstance instance = TestUtils.defineInstance();
/******************************************************************************* ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
** Execute the backend step - using the request as input, and the result as output. // define the process - an ELT from Persons to Persons - using the mock backend, and set to do many many records //
** ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////
*******************************************************************************/ QProcessMetaData process = new StreamedETLWithFrontendProcess().defineProcessMetaData(TestUtils.TABLE_NAME_PERSON, TestUtils.TABLE_NAME_PERSON, ExtractViaQueryWithCustomLimitStep.class, TestTransformStep.class, LoadViaInsertStep.class);
@Override process.setTableName(TestUtils.TABLE_NAME_SHAPE);
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException instance.addProcess(process);
{
QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance()); /////////////////////
queryInput.setSession(runBackendStepInput.getSession()); // run the process //
queryInput.setTableName(TestUtils.TABLE_NAME_SHAPE); /////////////////////
queryInput.setRecordPipe(getRecordPipe()); RunProcessInput request = new RunProcessInput(instance);
new QueryAction().execute(queryInput); request.setSession(TestUtils.getMockSession());
} request.setProcessName(process.getName());
request.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
request.setCallback(new Callback());
RunProcessOutput result = new RunProcessAction().execute(request);
assertNotNull(result);
assertTrue(result.getException().isEmpty());
assertEquals(new ExtractViaQueryWithCustomLimitStep().getLimit(), result.getValues().get(StreamedETLProcess.FIELD_RECORD_COUNT));
// todo what can we assert?
} }
@ -121,7 +114,7 @@ class StreamedETLWithFrontendProcessTest
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
public static class TestTransformStep extends AbstractTransformFunction public static class TestTransformStep extends AbstractTransformStep
{ {
/******************************************************************************* /*******************************************************************************
@ -134,8 +127,8 @@ class StreamedETLWithFrontendProcessTest
for(QRecord qRecord : getInputRecordPage()) for(QRecord qRecord : getInputRecordPage())
{ {
QRecord newQRecord = new QRecord(); QRecord newQRecord = new QRecord();
newQRecord.setValue("id", null); newQRecord.setValue("firstName", "Johnny");
newQRecord.setValue("name", "Transformed: " + qRecord.getValueString("name")); newQRecord.setValue("lastName", qRecord.getValueString("name"));
getOutputRecordPage().add(newQRecord); getOutputRecordPage().add(newQRecord);
} }
} }
@ -146,34 +139,41 @@ class StreamedETLWithFrontendProcessTest
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
public static class TestLoadStep extends AbstractLoadFunction public static class Callback implements QProcessCallback
{ {
/******************************************************************************* /*******************************************************************************
** Execute the backend step - using the request as input, and the result as output. ** Get the filter query for this callback.
**
*******************************************************************************/ *******************************************************************************/
@Override @Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException public QQueryFilter getQueryFilter()
{ {
InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance()); return (new QQueryFilter());
insertInput.setSession(runBackendStepInput.getSession());
insertInput.setTableName(TestUtils.TABLE_NAME_SHAPE);
insertInput.setTransaction(transaction);
insertInput.setRecords(getInputRecordPage());
new InsertAction().execute(insertInput);
} }
/******************************************************************************* /*******************************************************************************
** ** Get the field values for this callback.
*******************************************************************************/ *******************************************************************************/
@Override @Override
protected QBackendTransaction doOpenTransaction(RunBackendStepInput runBackendStepInput) throws QException public Map<String, Serializable> getFieldValues(List<QFieldMetaData> fields)
{ {
return null; return (null);
} }
} }
/*******************************************************************************
** The Mock backend - its query action will return as many rows as the limit -
** so let's make sure to give it a big limit.
*******************************************************************************/
public static class ExtractViaQueryWithCustomLimitStep extends ExtractViaQueryStep
{
@Override
public Integer getLimit()
{
return (10_000);
}
}
} }

View File

@ -440,7 +440,17 @@ public class TestUtils
*******************************************************************************/ *******************************************************************************/
public static List<QRecord> queryTable(String tableName) throws QException public static List<QRecord> queryTable(String tableName) throws QException
{ {
QueryInput queryInput = new QueryInput(TestUtils.defineInstance()); return (queryTable(TestUtils.defineInstance(), tableName));
}
/*******************************************************************************
**
*******************************************************************************/
public static List<QRecord> queryTable(QInstance instance, String tableName) throws QException
{
QueryInput queryInput = new QueryInput(instance);
queryInput.setSession(TestUtils.getMockSession()); queryInput.setSession(TestUtils.getMockSession());
queryInput.setTableName(tableName); queryInput.setTableName(tableName);
QueryOutput queryOutput = new QueryAction().execute(queryInput); QueryOutput queryOutput = new QueryAction().execute(queryInput);

View File

@ -120,7 +120,7 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte
boolean needToCloseConnection = false; boolean needToCloseConnection = false;
if(updateInput.getTransaction() != null && updateInput.getTransaction() instanceof RDBMSTransaction rdbmsTransaction) if(updateInput.getTransaction() != null && updateInput.getTransaction() instanceof RDBMSTransaction rdbmsTransaction)
{ {
LOG.debug("Using connection from insertInput [" + rdbmsTransaction.getConnection() + "]"); LOG.debug("Using connection from updateInput [" + rdbmsTransaction.getConnection() + "]");
connection = rdbmsTransaction.getConnection(); connection = rdbmsTransaction.getConnection();
} }
else else