QQQ-37 initial buidout of StreamedETLWithFrontendProcess

This commit is contained in:
2022-08-22 08:34:55 -05:00
parent 99f724e2c2
commit a86f42f373
22 changed files with 998 additions and 49 deletions

View File

@ -31,7 +31,6 @@ import com.kingsrook.qqq.backend.core.state.UUIDAndTypeStateKey;
** Argument passed to an AsyncJob when it runs, which can be used to communicate
** data back out of the job.
**
** TODO - future - allow cancellation to be indicated here?
*******************************************************************************/
public class AsyncJobCallback
{
@ -107,4 +106,17 @@ public class AsyncJobCallback
AsyncJobManager.getStateProvider().put(new UUIDAndTypeStateKey(jobUUID, StateType.ASYNC_JOB_STATUS), asyncJobStatus);
}
/*******************************************************************************
** Check if the asyncJobStatus had a cancellation requested.
**
** TODO - concern about multiple threads writing this object to a non-in-memory
** state provider, and this value getting lost...
*******************************************************************************/
public boolean wasCancelRequested()
{
return (this.asyncJobStatus.getCancelRequested());
}
}

View File

@ -183,4 +183,15 @@ public class AsyncJobManager
// return TempFileStateProvider.getInstance();
}
/*******************************************************************************
**
*******************************************************************************/
public void cancelJob(String jobUUID)
{
Optional<AsyncJobStatus> jobStatus = getJobStatus(jobUUID);
jobStatus.ifPresent(asyncJobStatus -> asyncJobStatus.setCancelRequested(true));
}
}

View File

@ -37,6 +37,8 @@ public class AsyncJobStatus implements Serializable
private Integer total;
private Exception caughtException;
private boolean cancelRequested;
/*******************************************************************************
@ -163,4 +165,26 @@ public class AsyncJobStatus implements Serializable
{
this.caughtException = caughtException;
}
/*******************************************************************************
** Getter for cancelRequested
**
*******************************************************************************/
public boolean getCancelRequested()
{
return cancelRequested;
}
/*******************************************************************************
** Setter for cancelRequested
**
*******************************************************************************/
public void setCancelRequested(boolean cancelRequested)
{
this.cancelRequested = cancelRequested;
}
}

View File

@ -0,0 +1,157 @@
package com.kingsrook.qqq.backend.core.actions.async;
import java.io.Serializable;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.utils.SleepUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/*******************************************************************************
**
*******************************************************************************/
public class AsyncRecordPipeLoop
{
private static final Logger LOG = LogManager.getLogger(AsyncRecordPipeLoop.class);
private static final int TIMEOUT_AFTER_NO_RECORDS_MS = 10 * 60 * 1000;
private static final int MAX_SLEEP_MS = 1000;
private static final int INIT_SLEEP_MS = 10;
/*******************************************************************************
**
*******************************************************************************/
public int run(String jobName, Integer recordLimit, RecordPipe recordPipe, UnsafeFunction<AsyncJobCallback, ? extends Serializable> job, UnsafeSupplier<Integer> consumer) throws QException
{
///////////////////////////////////////////////////
// start the extraction function as an async job //
///////////////////////////////////////////////////
AsyncJobManager asyncJobManager = new AsyncJobManager();
String jobUUID = asyncJobManager.startJob(jobName, job::apply);
LOG.info("Started job [" + jobUUID + "] for record pipe streaming");
AsyncJobState jobState = AsyncJobState.RUNNING;
AsyncJobStatus asyncJobStatus = null;
int recordCount = 0;
int nextSleepMillis = INIT_SLEEP_MS;
long lastReceivedRecordsAt = System.currentTimeMillis();
long jobStartTime = System.currentTimeMillis();
while(jobState.equals(AsyncJobState.RUNNING))
{
if(recordPipe.countAvailableRecords() == 0)
{
///////////////////////////////////////////////////////////
// if the pipe is empty, sleep to let the producer work. //
// todo - smarter sleep? like get notified vs. sleep? //
///////////////////////////////////////////////////////////
LOG.info("No records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work");
SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS);
nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS);
long timeSinceLastReceivedRecord = System.currentTimeMillis() - lastReceivedRecordsAt;
if(timeSinceLastReceivedRecord > TIMEOUT_AFTER_NO_RECORDS_MS)
{
throw (new QException("Job appears to have stopped producing records (last record received " + timeSinceLastReceivedRecord + " ms ago)."));
}
}
else
{
////////////////////////////////////////////////////////////////////////////////////////////////////////
// if the pipe has records, consume them. reset the sleep timer so if we sleep again it'll be short. //
////////////////////////////////////////////////////////////////////////////////////////////////////////
lastReceivedRecordsAt = System.currentTimeMillis();
nextSleepMillis = INIT_SLEEP_MS;
recordCount += consumer.get();
LOG.info(String.format("Processed %,d records so far", recordCount));
if(recordLimit != null && recordCount >= recordLimit)
{
asyncJobManager.cancelJob(jobUUID);
////////////////////////////////////////////////////////////////////////////////////////////////////
// in case the extract function doesn't recognize the cancellation request, //
// tell the pipe to "terminate" - meaning - flush its queue and just noop when given new records. //
// this should prevent anyone writing to such a pipe from potentially filling & blocking. //
////////////////////////////////////////////////////////////////////////////////////////////////////
recordPipe.terminate();
break;
}
}
//////////////////////////////
// refresh the job's status //
//////////////////////////////
Optional<AsyncJobStatus> optionalAsyncJobStatus = asyncJobManager.getJobStatus(jobUUID);
if(optionalAsyncJobStatus.isEmpty())
{
/////////////////////////////////////////////////
// todo - ... maybe some version of try-again? //
/////////////////////////////////////////////////
throw (new QException("Could not get status of job [" + jobUUID + "]"));
}
asyncJobStatus = optionalAsyncJobStatus.get();
jobState = asyncJobStatus.getState();
}
LOG.info("Job [" + jobUUID + "] completed with status: " + asyncJobStatus);
///////////////////////////////////
// propagate errors from the job //
///////////////////////////////////
if(asyncJobStatus != null && asyncJobStatus.getState().equals(AsyncJobState.ERROR))
{
throw (new QException("Job failed with an error", asyncJobStatus.getCaughtException()));
}
//////////////////////////////////////////////////////
// send the final records to transform & load steps //
//////////////////////////////////////////////////////
recordCount += consumer.get();
long endTime = System.currentTimeMillis();
LOG.info(String.format("Processed %,d records", recordCount)
+ String.format(" at end of job in %,d ms (%.2f records/second).", (endTime - jobStartTime), 1000d * (recordCount / (.001d + (endTime - jobStartTime)))));
return (recordCount);
}
/*******************************************************************************
**
*******************************************************************************/
@FunctionalInterface
public interface UnsafeFunction<T, R>
{
/*******************************************************************************
**
*******************************************************************************/
R apply(T t) throws QException;
}
/*******************************************************************************
**
*******************************************************************************/
@FunctionalInterface
public interface UnsafeSupplier<T>
{
/*******************************************************************************
**
*******************************************************************************/
T get() throws QException;
}
}

View File

@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.actions.customizers;
import java.util.Optional;
import java.util.function.Function;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeType;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
@ -93,4 +94,44 @@ public class QCodeLoader
}
}
/*******************************************************************************
**
*******************************************************************************/
public static <T extends BackendStep> T getBackendStep(Class<T> expectedType, QCodeReference codeReference)
{
if(codeReference == null)
{
return (null);
}
if(!codeReference.getCodeType().equals(QCodeType.JAVA))
{
///////////////////////////////////////////////////////////////////////////////////////
// todo - 1) support more languages, 2) wrap them w/ java Functions here, 3) profit! //
///////////////////////////////////////////////////////////////////////////////////////
throw (new IllegalArgumentException("Only JAVA BackendSteps are supported at this time."));
}
try
{
Class<?> customizerClass = Class.forName(codeReference.getName());
return ((T) customizerClass.getConstructor().newInstance());
}
catch(Exception e)
{
LOG.error("Error initializing customizer: " + codeReference);
//////////////////////////////////////////////////////////////////////////////////////////////////////////
// return null here - under the assumption that during normal run-time operations, we'll never hit here //
// as we'll want to validate all functions in the instance validator at startup time (and IT will throw //
// if it finds an invalid code reference //
//////////////////////////////////////////////////////////////////////////////////////////////////////////
return (null);
}
}
}

View File

@ -22,7 +22,6 @@
package com.kingsrook.qqq.backend.core.actions.interfaces;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertOutput;
@ -32,19 +31,11 @@ import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertOutput;
** Interface for the Insert action.
**
*******************************************************************************/
public interface InsertInterface
public interface InsertInterface extends QActionInterface
{
/*******************************************************************************
**
*******************************************************************************/
InsertOutput execute(InsertInput insertInput) throws QException;
/*******************************************************************************
**
*******************************************************************************/
default QBackendTransaction openTransaction(InsertInput insertInput) throws QException
{
return (new QBackendTransaction());
}
}

View File

@ -0,0 +1,23 @@
package com.kingsrook.qqq.backend.core.actions.interfaces;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput;
/*******************************************************************************
**
*******************************************************************************/
public interface QActionInterface
{
/*******************************************************************************
**
*******************************************************************************/
default QBackendTransaction openTransaction(AbstractTableActionInput input) throws QException
{
return (new QBackendTransaction());
}
}

View File

@ -42,16 +42,36 @@ public class RecordPipe
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(1_000);
private boolean isTerminated = false;
/*******************************************************************************
** Turn off the pipe. Stop accepting new records (just ignore them in the add
** method). Clear the existing queue. Don't return any more records. Note that
** if consumeAvailableRecords was running in another thread, it may still return
** some records that it read before this call.
*******************************************************************************/
public void terminate()
{
isTerminated = true;
queue.clear();
}
/*******************************************************************************
** Add a record to the pipe
** Returns true iff the record fit in the pipe; false if the pipe is currently full.
*******************************************************************************/
public void addRecord(QRecord record)
{
if(isTerminated)
{
return;
}
boolean offerResult = queue.offer(record);
while(!offerResult)
while(!offerResult && !isTerminated)
{
LOG.debug("Record pipe.add failed (due to full pipe). Blocking.");
SleepUtils.sleep(100, TimeUnit.MILLISECONDS);
@ -78,7 +98,7 @@ public class RecordPipe
{
List<QRecord> rs = new ArrayList<>();
while(true)
while(!isTerminated)
{
QRecord record = queue.poll();
if(record == null)
@ -98,6 +118,11 @@ public class RecordPipe
*******************************************************************************/
public int countAvailableRecords()
{
if(isTerminated)
{
return (0);
}
return (queue.size());
}

View File

@ -25,8 +25,8 @@ package com.kingsrook.qqq.backend.core.model.actions.tables.query;
import java.io.Serializable;
import java.util.List;
import java.util.function.Function;
import com.kingsrook.qqq.backend.core.actions.customizers.CustomizerLoader;
import com.kingsrook.qqq.backend.core.actions.customizers.Customizers;
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
import com.kingsrook.qqq.backend.core.model.actions.AbstractActionOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import org.apache.logging.log4j.LogManager;
@ -62,7 +62,7 @@ public class QueryOutput extends AbstractActionOutput implements Serializable
storage = new QueryOutputList();
}
postQueryRecordCustomizer = (Function<QRecord, QRecord>) CustomizerLoader.getTableCustomizerFunction(queryInput.getTable(), Customizers.POST_QUERY_RECORD);
postQueryRecordCustomizer = (Function<QRecord, QRecord>) QCodeLoader.getTableCustomizerFunction(queryInput.getTable(), Customizers.POST_QUERY_RECORD);
}

View File

@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.model.actions.tables.update;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
@ -34,6 +35,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
*******************************************************************************/
public class UpdateInput extends AbstractTableActionInput
{
private QBackendTransaction transaction;
private List<QRecord> records;
////////////////////////////////////////////////////////////////////////////////////////////
@ -65,6 +67,40 @@ public class UpdateInput extends AbstractTableActionInput
/*******************************************************************************
** Getter for transaction
**
*******************************************************************************/
public QBackendTransaction getTransaction()
{
return transaction;
}
/*******************************************************************************
** Setter for transaction
**
*******************************************************************************/
public void setTransaction(QBackendTransaction transaction)
{
this.transaction = transaction;
}
/*******************************************************************************
** Fluent setter for transaction
**
*******************************************************************************/
public UpdateInput withTransaction(QBackendTransaction transaction)
{
this.transaction = transaction;
return (this);
}
/*******************************************************************************
** Getter for records
**

View File

@ -228,6 +228,16 @@ public class QProcessMetaData implements QAppChildMetaData
/*******************************************************************************
** Wrapper to getStep, that internally casts to FrontendStepMetaData
*******************************************************************************/
public QFrontendStepMetaData getFrontendStep(String name)
{
return (QFrontendStepMetaData) getStep(name);
}
/*******************************************************************************
** Get a list of all of the input fields used by all the steps in this process.
*******************************************************************************/

View File

@ -0,0 +1,58 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
/*******************************************************************************
**
*******************************************************************************/
public abstract class AbstractExtractFunction implements BackendStep
{
private RecordPipe recordPipe;
private Integer limit;
/*******************************************************************************
**
*******************************************************************************/
public void setRecordPipe(RecordPipe recordPipe)
{
this.recordPipe = recordPipe;
}
/*******************************************************************************
**
*******************************************************************************/
public RecordPipe getRecordPipe()
{
return recordPipe;
}
/*******************************************************************************
** Getter for limit
**
*******************************************************************************/
public Integer getLimit()
{
return limit;
}
/*******************************************************************************
** Setter for limit
**
*******************************************************************************/
public void setLimit(Integer limit)
{
this.limit = limit;
}
}

View File

@ -0,0 +1,85 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.ArrayList;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
/*******************************************************************************
**
*******************************************************************************/
public abstract class AbstractLoadFunction implements BackendStep
{
private List<QRecord> inputRecordPage = new ArrayList<>();
private List<QRecord> outputRecordPage = new ArrayList<>();
protected QBackendTransaction transaction;
/*******************************************************************************
**
*******************************************************************************/
public QBackendTransaction openTransaction(RunBackendStepInput runBackendStepInput) throws QException
{
this.transaction = doOpenTransaction(runBackendStepInput);
return (transaction);
}
/*******************************************************************************
**
*******************************************************************************/
protected abstract QBackendTransaction doOpenTransaction(RunBackendStepInput runBackendStepInput) throws QException;
/*******************************************************************************
** Getter for recordPage
**
*******************************************************************************/
public List<QRecord> getInputRecordPage()
{
return inputRecordPage;
}
/*******************************************************************************
** Setter for recordPage
**
*******************************************************************************/
public void setInputRecordPage(List<QRecord> inputRecordPage)
{
this.inputRecordPage = inputRecordPage;
}
/*******************************************************************************
** Getter for outputRecordPage
**
*******************************************************************************/
public List<QRecord> getOutputRecordPage()
{
return outputRecordPage;
}
/*******************************************************************************
** Setter for outputRecordPage
**
*******************************************************************************/
public void setOutputRecordPage(List<QRecord> outputRecordPage)
{
this.outputRecordPage = outputRecordPage;
}
}

View File

@ -0,0 +1,61 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.ArrayList;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
/*******************************************************************************
**
*******************************************************************************/
public abstract class AbstractTransformFunction implements BackendStep
{
private List<QRecord> inputRecordPage = new ArrayList<>();
private List<QRecord> outputRecordPage = new ArrayList<>();
/*******************************************************************************
** Getter for recordPage
**
*******************************************************************************/
public List<QRecord> getInputRecordPage()
{
return inputRecordPage;
}
/*******************************************************************************
** Setter for recordPage
**
*******************************************************************************/
public void setInputRecordPage(List<QRecord> inputRecordPage)
{
this.inputRecordPage = inputRecordPage;
}
/*******************************************************************************
** Getter for outputRecordPage
**
*******************************************************************************/
public List<QRecord> getOutputRecordPage()
{
return outputRecordPage;
}
/*******************************************************************************
** Setter for outputRecordPage
**
*******************************************************************************/
public void setOutputRecordPage(List<QRecord> outputRecordPage)
{
this.outputRecordPage = outputRecordPage;
}
}

View File

@ -0,0 +1,49 @@
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
/*******************************************************************************
** Base class for the StreamedETL preview & execute steps
*******************************************************************************/
public class BaseStreamedETLStep
{
protected static final int IN_MEMORY_RECORD_LIMIT = 20;
/*******************************************************************************
**
*******************************************************************************/
protected AbstractExtractFunction getExtractFunction(RunBackendStepInput runBackendStepInput)
{
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_EXTRACT_CODE);
return (QCodeLoader.getBackendStep(AbstractExtractFunction.class, codeReference));
}
/*******************************************************************************
**
*******************************************************************************/
protected AbstractTransformFunction getTransformFunction(RunBackendStepInput runBackendStepInput)
{
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE);
return (QCodeLoader.getBackendStep(AbstractTransformFunction.class, codeReference));
}
/*******************************************************************************
**
*******************************************************************************/
protected AbstractLoadFunction getLoadFunction(RunBackendStepInput runBackendStepInput)
{
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE);
return (QCodeLoader.getBackendStep(AbstractLoadFunction.class, codeReference));
}
}

View File

@ -0,0 +1,146 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.ArrayList;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamed.StreamedETLProcess;
/*******************************************************************************
** Backend step to do a execute a streamed ETL job
*******************************************************************************/
public class StreamedETLExecuteStep extends BaseStreamedETLStep implements BackendStep
{
/*******************************************************************************
**
*******************************************************************************/
@Override
@SuppressWarnings("checkstyle:indentation")
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
QBackendTransaction transaction = null;
try
{
///////////////////////////////////////////////////////
// set up the extract, transform, and load functions //
///////////////////////////////////////////////////////
RecordPipe recordPipe = new RecordPipe();
AbstractExtractFunction extractFunction = getExtractFunction(runBackendStepInput);
extractFunction.setRecordPipe(recordPipe);
AbstractTransformFunction transformFunction = getTransformFunction(runBackendStepInput);
AbstractLoadFunction loadFunction = getLoadFunction(runBackendStepInput);
transaction = loadFunction.openTransaction(runBackendStepInput);
List<QRecord> loadedRecordList = new ArrayList<>();
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractFunction", null, recordPipe, (status) ->
{
extractFunction.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput);
},
() -> (consumeRecordsFromPipe(recordPipe, transformFunction, loadFunction, runBackendStepInput, runBackendStepOutput, loadedRecordList))
);
runBackendStepOutput.addValue(StreamedETLProcess.FIELD_RECORD_COUNT, recordCount);
runBackendStepOutput.setRecords(loadedRecordList);
/////////////////////
// commit the work //
/////////////////////
transaction.commit();
}
catch(Exception e)
{
////////////////////////////////////////////////////////////////////////////////
// rollback the work, then re-throw the error for up-stream to catch & report //
////////////////////////////////////////////////////////////////////////////////
if(transaction != null)
{
transaction.rollback();
}
throw (e);
}
finally
{
////////////////////////////////////////////////////////////
// always close our transactions (e.g., jdbc connections) //
////////////////////////////////////////////////////////////
if(transaction != null)
{
transaction.close();
}
}
}
/*******************************************************************************
**
*******************************************************************************/
private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformFunction transformFunction, AbstractLoadFunction loadFunction, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> loadedRecordList) throws QException
{
///////////////////////////////////
// get the records from the pipe //
///////////////////////////////////
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
/////////////////////////////////////////////////////
// pass the records through the transform function //
/////////////////////////////////////////////////////
transformFunction.setInputRecordPage(qRecords);
transformFunction.setOutputRecordPage(new ArrayList<>());
transformFunction.run(runBackendStepInput, runBackendStepOutput);
////////////////////////////////////////////////
// pass the records through the load function //
////////////////////////////////////////////////
loadFunction.setInputRecordPage(transformFunction.getOutputRecordPage());
loadFunction.setOutputRecordPage(new ArrayList<>());
loadFunction.run(runBackendStepInput, runBackendStepOutput);
///////////////////////////////////////////////////////
// copy a small number of records to the output list //
///////////////////////////////////////////////////////
int i = 0;
while(loadedRecordList.size() < IN_MEMORY_RECORD_LIMIT && i < loadFunction.getOutputRecordPage().size())
{
loadedRecordList.add(loadFunction.getOutputRecordPage().get(i++));
}
return (qRecords.size());
}
}

View File

@ -0,0 +1,95 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import java.util.ArrayList;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
/*******************************************************************************
** Backend step to do a preview of a full streamed ETL job
*******************************************************************************/
public class StreamedETLPreviewStep extends BaseStreamedETLStep implements BackendStep
{
/*******************************************************************************
**
*******************************************************************************/
@Override
@SuppressWarnings("checkstyle:indentation")
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
RecordPipe recordPipe = new RecordPipe();
AbstractExtractFunction extractFunction = getExtractFunction(runBackendStepInput);
extractFunction.setLimit(IN_MEMORY_RECORD_LIMIT); // todo - process field?
extractFunction.setRecordPipe(recordPipe);
AbstractTransformFunction transformFunction = getTransformFunction(runBackendStepInput);
List<QRecord> transformedRecordList = new ArrayList<>();
new AsyncRecordPipeLoop().run("StreamedETL>Preview>ExtractFunction", IN_MEMORY_RECORD_LIMIT, recordPipe, (status) ->
{
extractFunction.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput);
},
() -> (consumeRecordsFromPipe(recordPipe, transformFunction, runBackendStepInput, runBackendStepOutput, transformedRecordList))
);
runBackendStepOutput.setRecords(transformedRecordList);
}
/*******************************************************************************
**
*******************************************************************************/
private int consumeRecordsFromPipe(RecordPipe recordPipe, AbstractTransformFunction transformFunction, RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput, List<QRecord> transformedRecordList) throws QException
{
///////////////////////////////////
// get the records from the pipe //
///////////////////////////////////
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
/////////////////////////////////////////////////////
// pass the records through the transform function //
/////////////////////////////////////////////////////
transformFunction.setInputRecordPage(qRecords);
transformFunction.run(runBackendStepInput, runBackendStepOutput);
////////////////////////////////////////////////////
// add the transformed records to the output list //
////////////////////////////////////////////////////
transformedRecordList.addAll(transformFunction.getOutputRecordPage());
return (qRecords.size());
}
}

View File

@ -0,0 +1,89 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFrontendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFunctionInputMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData;
/*******************************************************************************
** Definition for Streamed ETL process that includes a frontend.
**
*******************************************************************************/
public class StreamedETLWithFrontendProcess
{
public static final String PROCESS_NAME = "etl.streamedWithFrontend";
public static final String STEP_NAME_PREVIEW = "preview";
public static final String STEP_NAME_REVIEW = "review";
public static final String STEP_NAME_EXECUTE = "execute";
public static final String STEP_NAME_RESULT = "result";
public static final String FIELD_EXTRACT_CODE = "extract";
public static final String FIELD_TRANSFORM_CODE = "transform";
public static final String FIELD_LOAD_CODE = "load";
public static final String FIELD_SOURCE_TABLE = "sourceTable";
public static final String FIELD_DESTINATION_TABLE = "destinationTable";
public static final String FIELD_MAPPING_JSON = "mappingJSON";
public static final String FIELD_RECORD_COUNT = "recordCount";
/*******************************************************************************
**
*******************************************************************************/
public QProcessMetaData defineProcessMetaData()
{
QStepMetaData previewStep = new QBackendStepMetaData()
.withName(STEP_NAME_PREVIEW)
.withCode(new QCodeReference(StreamedETLPreviewStep.class))
.withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData().withName(FIELD_EXTRACT_CODE))
.withField(new QFieldMetaData().withName(FIELD_TRANSFORM_CODE)));
QFrontendStepMetaData reviewStep = new QFrontendStepMetaData()
.withName(STEP_NAME_REVIEW);
QStepMetaData executeStep = new QBackendStepMetaData()
.withName(STEP_NAME_EXECUTE)
.withCode(new QCodeReference(StreamedETLExecuteStep.class))
.withInputData(new QFunctionInputMetaData()
.withField(new QFieldMetaData().withName(FIELD_LOAD_CODE)));
QFrontendStepMetaData resultStep = new QFrontendStepMetaData()
.withName(STEP_NAME_RESULT);
return new QProcessMetaData()
.withName(PROCESS_NAME)
.addStep(previewStep)
.addStep(reviewStep)
.addStep(executeStep)
.addStep(resultStep);
}
}

View File

@ -29,6 +29,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.interfaces.QActionInterface;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
@ -40,13 +43,18 @@ import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import com.kingsrook.qqq.backend.module.rdbms.jdbc.ConnectionManager;
import com.kingsrook.qqq.backend.module.rdbms.model.metadata.RDBMSBackendMetaData;
import com.kingsrook.qqq.backend.module.rdbms.model.metadata.RDBMSTableBackendDetails;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/*******************************************************************************
** Base class for all core actions in the RDBMS module.
*******************************************************************************/
public abstract class AbstractRDBMSAction
public abstract class AbstractRDBMSAction implements QActionInterface
{
private static final Logger LOG = LogManager.getLogger(AbstractRDBMSAction.class);
/*******************************************************************************
** Get the table name to use in the RDBMS from a QTableMetaData.
@ -319,4 +327,26 @@ public abstract class AbstractRDBMSAction
{
return fieldType == QFieldType.STRING || fieldType == QFieldType.TEXT || fieldType == QFieldType.HTML || fieldType == QFieldType.PASSWORD;
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public QBackendTransaction openTransaction(AbstractTableActionInput input) throws QException
{
try
{
LOG.info("Opening transaction");
Connection connection = getConnection(input);
return (new RDBMSTransaction(connection));
}
catch(Exception e)
{
throw new QException("Error opening transaction: " + e.getMessage(), e);
}
}
}

View File

@ -28,7 +28,6 @@ import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
import com.kingsrook.qqq.backend.core.actions.interfaces.InsertInterface;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
@ -160,26 +159,4 @@ public class RDBMSInsertAction extends AbstractRDBMSAction implements InsertInte
}
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public QBackendTransaction openTransaction(InsertInput insertInput) throws QException
{
try
{
LOG.info("Opening transaction");
Connection connection = getConnection(insertInput);
return (new RDBMSTransaction(connection));
}
catch(Exception e)
{
throw new QException("Error opening transaction: " + e.getMessage(), e);
}
}
}

View File

@ -123,6 +123,12 @@ public class RDBMSQueryAction extends AbstractRDBMSAction implements QueryInterf
}
queryOutput.addRecord(record);
if(queryInput.getAsyncJobCallback().wasCancelRequested())
{
LOG.info("Breaking query job, as requested.");
break;
}
}
}), params);

View File

@ -114,7 +114,22 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte
outputRecords.add(outputRecord);
}
try(Connection connection = getConnection(updateInput))
try
{
Connection connection;
boolean needToCloseConnection = false;
if(updateInput.getTransaction() != null && updateInput.getTransaction() instanceof RDBMSTransaction rdbmsTransaction)
{
LOG.debug("Using connection from insertInput [" + rdbmsTransaction.getConnection() + "]");
connection = rdbmsTransaction.getConnection();
}
else
{
connection = getConnection(updateInput);
needToCloseConnection = true;
}
try
{
/////////////////////////////////////////////////////////////////////////////////////////////
// process each distinct list of fields being updated (e.g., each different SQL statement) //
@ -123,6 +138,14 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte
{
updateRecordsWithMatchingListOfFields(updateInput, connection, table, recordsByFieldBeingUpdated.get(fieldsBeingUpdated), fieldsBeingUpdated);
}
}
finally
{
if(needToCloseConnection)
{
connection.close();
}
}
return rs;
}
@ -191,7 +214,6 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte
/*******************************************************************************
**
*******************************************************************************/
@ -276,6 +298,8 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte
return (true);
}
/*******************************************************************************
**
*******************************************************************************/
@ -285,5 +309,4 @@ public class RDBMSUpdateAction extends AbstractRDBMSAction implements UpdateInte
updateInput.getAsyncJobCallback().updateStatus(statusCounter, updateInput.getRecords().size());
}
}