mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 05:01:07 +00:00
QQQ-40 getting closer to production-ready on automations
This commit is contained in:
@ -135,11 +135,11 @@ public class AsyncJobManager
|
|||||||
Thread.currentThread().setName("Job:" + jobName + ":" + uuidAndTypeStateKey.getUuid().toString().substring(0, 8));
|
Thread.currentThread().setName("Job:" + jobName + ":" + uuidAndTypeStateKey.getUuid().toString().substring(0, 8));
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
LOG.info("Starting job " + uuidAndTypeStateKey.getUuid());
|
LOG.debug("Starting job " + uuidAndTypeStateKey.getUuid());
|
||||||
T result = asyncJob.run(new AsyncJobCallback(uuidAndTypeStateKey.getUuid(), asyncJobStatus));
|
T result = asyncJob.run(new AsyncJobCallback(uuidAndTypeStateKey.getUuid(), asyncJobStatus));
|
||||||
asyncJobStatus.setState(AsyncJobState.COMPLETE);
|
asyncJobStatus.setState(AsyncJobState.COMPLETE);
|
||||||
getStateProvider().put(uuidAndTypeStateKey, asyncJobStatus);
|
getStateProvider().put(uuidAndTypeStateKey, asyncJobStatus);
|
||||||
LOG.info("Completed job " + uuidAndTypeStateKey.getUuid());
|
LOG.debug("Completed job " + uuidAndTypeStateKey.getUuid());
|
||||||
return (result);
|
return (result);
|
||||||
}
|
}
|
||||||
catch(Exception e)
|
catch(Exception e)
|
||||||
|
@ -48,7 +48,7 @@ public class AsyncRecordPipeLoop
|
|||||||
///////////////////////////////////////////////////
|
///////////////////////////////////////////////////
|
||||||
AsyncJobManager asyncJobManager = new AsyncJobManager();
|
AsyncJobManager asyncJobManager = new AsyncJobManager();
|
||||||
String jobUUID = asyncJobManager.startJob(jobName, supplier::apply);
|
String jobUUID = asyncJobManager.startJob(jobName, supplier::apply);
|
||||||
LOG.info("Started supplier job [" + jobUUID + "] for record pipe.");
|
LOG.debug("Started supplier job [" + jobUUID + "] for record pipe.");
|
||||||
|
|
||||||
AsyncJobState jobState = AsyncJobState.RUNNING;
|
AsyncJobState jobState = AsyncJobState.RUNNING;
|
||||||
AsyncJobStatus asyncJobStatus = null;
|
AsyncJobStatus asyncJobStatus = null;
|
||||||
@ -66,7 +66,7 @@ public class AsyncRecordPipeLoop
|
|||||||
// if the pipe is too empty, sleep to let the producer work. //
|
// if the pipe is too empty, sleep to let the producer work. //
|
||||||
// todo - smarter sleep? like get notified vs. sleep? //
|
// todo - smarter sleep? like get notified vs. sleep? //
|
||||||
///////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////
|
||||||
LOG.debug("Too few records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work");
|
LOG.trace("Too few records are available in the pipe. Sleeping [" + nextSleepMillis + "] ms to give producer a chance to work");
|
||||||
SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS);
|
SleepUtils.sleep(nextSleepMillis, TimeUnit.MILLISECONDS);
|
||||||
nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS);
|
nextSleepMillis = Math.min(nextSleepMillis * 2, MAX_SLEEP_MS);
|
||||||
|
|
||||||
@ -85,7 +85,7 @@ public class AsyncRecordPipeLoop
|
|||||||
nextSleepMillis = INIT_SLEEP_MS;
|
nextSleepMillis = INIT_SLEEP_MS;
|
||||||
|
|
||||||
recordCount += consumer.get();
|
recordCount += consumer.get();
|
||||||
LOG.info(String.format("Processed %,d records so far", recordCount));
|
LOG.debug(String.format("Processed %,d records so far", recordCount));
|
||||||
|
|
||||||
if(recordLimit != null && recordCount >= recordLimit)
|
if(recordLimit != null && recordCount >= recordLimit)
|
||||||
{
|
{
|
||||||
@ -117,7 +117,7 @@ public class AsyncRecordPipeLoop
|
|||||||
jobState = asyncJobStatus.getState();
|
jobState = asyncJobStatus.getState();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Job [" + jobUUID + "] completed with status: " + asyncJobStatus);
|
LOG.debug("Job [" + jobUUID + "][" + jobName + "] completed with status: " + asyncJobStatus);
|
||||||
|
|
||||||
///////////////////////////////////
|
///////////////////////////////////
|
||||||
// propagate errors from the job //
|
// propagate errors from the job //
|
||||||
@ -133,8 +133,12 @@ public class AsyncRecordPipeLoop
|
|||||||
recordCount += consumer.get();
|
recordCount += consumer.get();
|
||||||
|
|
||||||
long endTime = System.currentTimeMillis();
|
long endTime = System.currentTimeMillis();
|
||||||
LOG.info(String.format("Processed %,d records", recordCount)
|
|
||||||
+ String.format(" at end of job in %,d ms (%.2f records/second).", (endTime - jobStartTime), 1000d * (recordCount / (.001d + (endTime - jobStartTime)))));
|
if(recordCount > 0)
|
||||||
|
{
|
||||||
|
LOG.info(String.format("Processed %,d records", recordCount)
|
||||||
|
+ String.format(" at end of job [%s] in %,d ms (%.2f records/second).", jobName, (endTime - jobStartTime), 1000d * (recordCount / (.001d + (endTime - jobStartTime)))));
|
||||||
|
}
|
||||||
|
|
||||||
return (recordCount);
|
return (recordCount);
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,9 @@
|
|||||||
package com.kingsrook.qqq.backend.core.actions.automation;
|
package com.kingsrook.qqq.backend.core.actions.automation;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
|
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
|
||||||
@ -10,6 +12,8 @@ import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
|||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTrackingType;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTrackingType;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TableAutomationAction;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TriggerEvent;
|
||||||
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||||
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
||||||
import org.apache.commons.lang.NotImplementedException;
|
import org.apache.commons.lang.NotImplementedException;
|
||||||
@ -37,7 +41,12 @@ public class RecordAutomationStatusUpdater
|
|||||||
return (false);
|
return (false);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(automationStatus.equals(AutomationStatus.PENDING_INSERT_AUTOMATIONS) || automationStatus.equals(AutomationStatus.PENDING_UPDATE_AUTOMATIONS))
|
if(canWeSkipPendingAndGoToOkay(table, automationStatus))
|
||||||
|
{
|
||||||
|
automationStatus = AutomationStatus.OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(automationStatus.equals(AutomationStatus.PENDING_UPDATE_AUTOMATIONS))
|
||||||
{
|
{
|
||||||
Exception e = new Exception();
|
Exception e = new Exception();
|
||||||
for(StackTraceElement stackTraceElement : e.getStackTrace())
|
for(StackTraceElement stackTraceElement : e.getStackTrace())
|
||||||
@ -45,7 +54,7 @@ public class RecordAutomationStatusUpdater
|
|||||||
String className = stackTraceElement.getClassName();
|
String className = stackTraceElement.getClassName();
|
||||||
if(className.contains("com.kingsrook.qqq.backend.core.actions.automation") && !className.equals(RecordAutomationStatusUpdater.class.getName()) && !className.endsWith("Test"))
|
if(className.contains("com.kingsrook.qqq.backend.core.actions.automation") && !className.equals(RecordAutomationStatusUpdater.class.getName()) && !className.endsWith("Test"))
|
||||||
{
|
{
|
||||||
LOG.info("Avoiding re-setting automation status to PENDING while running an automation");
|
LOG.debug("Avoiding re-setting automation status to PENDING_UPDATE while running an automation");
|
||||||
return (false);
|
return (false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -66,6 +75,35 @@ public class RecordAutomationStatusUpdater
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** If a table has no automation actions defined for Insert (or Update), and we're
|
||||||
|
** being asked to set status to PENDING_INSERT (or PENDING_UPDATE), then just
|
||||||
|
** move the status straight to OK.
|
||||||
|
*******************************************************************************/
|
||||||
|
private static boolean canWeSkipPendingAndGoToOkay(QTableMetaData table, AutomationStatus automationStatus)
|
||||||
|
{
|
||||||
|
List<TableAutomationAction> tableActions = Objects.requireNonNullElse(table.getAutomationDetails().getActions(), new ArrayList<>());
|
||||||
|
|
||||||
|
if(automationStatus.equals(AutomationStatus.PENDING_INSERT_AUTOMATIONS))
|
||||||
|
{
|
||||||
|
if(tableActions.stream().noneMatch(a -> TriggerEvent.POST_INSERT.equals(a.getTriggerEvent())))
|
||||||
|
{
|
||||||
|
return (true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if(automationStatus.equals(AutomationStatus.PENDING_UPDATE_AUTOMATIONS))
|
||||||
|
{
|
||||||
|
if(tableActions.stream().noneMatch(a -> TriggerEvent.POST_UPDATE.equals(a.getTriggerEvent())))
|
||||||
|
{
|
||||||
|
return (true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** for a list of records, update their automation status and actually Update the
|
** for a list of records, update their automation status and actually Update the
|
||||||
** backend as well.
|
** backend as well.
|
||||||
@ -81,7 +119,17 @@ public class RecordAutomationStatusUpdater
|
|||||||
UpdateInput updateInput = new UpdateInput(instance);
|
UpdateInput updateInput = new UpdateInput(instance);
|
||||||
updateInput.setSession(session);
|
updateInput.setSession(session);
|
||||||
updateInput.setTableName(table.getName());
|
updateInput.setTableName(table.getName());
|
||||||
updateInput.setRecords(records);
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// build records with just their pkey & status field for this update, to avoid //
|
||||||
|
// changing other values (relies on assumption of Patch semantics in UpdateAction) //
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
updateInput.setRecords(records.stream().map(r -> new QRecord()
|
||||||
|
.withTableName(r.getTableName())
|
||||||
|
.withValue(table.getPrimaryKeyField(), r.getValue(table.getPrimaryKeyField()))
|
||||||
|
.withValue(automationDetails.getStatusTracking().getFieldName(), r.getValue(automationDetails.getStatusTracking().getFieldName()))).toList());
|
||||||
|
updateInput.setAreAllValuesBeingUpdatedTheSame(true);
|
||||||
|
|
||||||
new UpdateAction().execute(updateInput);
|
new UpdateAction().execute(updateInput);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,33 +1,41 @@
|
|||||||
package com.kingsrook.qqq.backend.core.actions.automation.polling;
|
package com.kingsrook.qqq.backend.core.actions.automation.polling;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop;
|
||||||
import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
|
import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
|
||||||
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationHandler;
|
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationHandler;
|
||||||
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationStatusUpdater;
|
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationStatusUpdater;
|
||||||
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
|
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
|
||||||
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterOrderBy;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
|
|
||||||
import com.kingsrook.qqq.backend.core.model.automation.RecordAutomationInput;
|
import com.kingsrook.qqq.backend.core.model.automation.RecordAutomationInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TableAutomationAction;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TableAutomationAction;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TriggerEvent;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TriggerEvent;
|
||||||
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||||
|
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.StreamedETLWithFrontendProcess;
|
||||||
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
||||||
import com.kingsrook.qqq.backend.core.utils.StringUtils;
|
import com.kingsrook.qqq.backend.core.utils.StringUtils;
|
||||||
import org.apache.commons.lang.NotImplementedException;
|
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
@ -40,8 +48,8 @@ class PollingAutomationRunner implements Runnable
|
|||||||
{
|
{
|
||||||
private static final Logger LOG = LogManager.getLogger(PollingAutomationRunner.class);
|
private static final Logger LOG = LogManager.getLogger(PollingAutomationRunner.class);
|
||||||
|
|
||||||
private QInstance instance;
|
private QInstance instance;
|
||||||
private String providerName;
|
private String providerName;
|
||||||
private Supplier<QSession> sessionSupplier;
|
private Supplier<QSession> sessionSupplier;
|
||||||
|
|
||||||
private List<QTableMetaData> managedTables = new ArrayList<>();
|
private List<QTableMetaData> managedTables = new ArrayList<>();
|
||||||
@ -130,7 +138,7 @@ class PollingAutomationRunner implements Runnable
|
|||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
** Query for and process records that have a PENDING status on a given table.
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
private void processTable(QTableMetaData table) throws QException
|
private void processTable(QTableMetaData table) throws QException
|
||||||
{
|
{
|
||||||
@ -142,7 +150,7 @@ class PollingAutomationRunner implements Runnable
|
|||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
** Query for and process records that have a PENDING_INSERT or PENDING_UPDATE status on a given table.
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
private void processTableInsertOrUpdate(QTableMetaData table, QSession session, boolean isInsert) throws QException
|
private void processTableInsertOrUpdate(QTableMetaData table, QSession session, boolean isInsert) throws QException
|
||||||
{
|
{
|
||||||
@ -153,88 +161,185 @@ class PollingAutomationRunner implements Runnable
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info(" Query for records " + automationStatus + " in " + table);
|
LOG.debug(" Query for records " + automationStatus + " in " + table);
|
||||||
|
|
||||||
QueryInput queryInput = new QueryInput(instance);
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
queryInput.setSession(session); // todo - where the heck can we get this from??
|
// run an async-pipe loop - that will query for records in PENDING - put them in a pipe - then apply actions to them //
|
||||||
queryInput.setTableName(table.getName());
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
RecordPipe recordPipe = new RecordPipe();
|
||||||
|
AsyncRecordPipeLoop asyncRecordPipeLoop = new AsyncRecordPipeLoop();
|
||||||
|
asyncRecordPipeLoop.run("PollingAutomationRunner>Query>" + (isInsert ? "insert" : "update"), null, recordPipe, (status) ->
|
||||||
|
{
|
||||||
|
QueryInput queryInput = new QueryInput(instance);
|
||||||
|
queryInput.setSession(session);
|
||||||
|
queryInput.setTableName(table.getName());
|
||||||
|
queryInput.setFilter(new QQueryFilter().withCriteria(new QFilterCriteria(table.getAutomationDetails().getStatusTracking().getFieldName(), QCriteriaOperator.IN, List.of(automationStatus.getId()))));
|
||||||
|
queryInput.setRecordPipe(recordPipe);
|
||||||
|
return (new QueryAction().execute(queryInput));
|
||||||
|
}, () ->
|
||||||
|
{
|
||||||
|
List<QRecord> records = recordPipe.consumeAvailableRecords();
|
||||||
|
applyActionsToRecords(session, table, records, actions, isInsert);
|
||||||
|
return (records.size());
|
||||||
|
}
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** For a set of records that were found to be in a PENDING state - run all the
|
||||||
|
** table's actions against them.
|
||||||
|
*******************************************************************************/
|
||||||
|
private void applyActionsToRecords(QSession session, QTableMetaData table, List<QRecord> records, List<TableAutomationAction> actions, boolean isInsert) throws QException
|
||||||
|
{
|
||||||
|
if(CollectionUtils.nullSafeIsEmpty(records))
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////
|
||||||
|
// mark the records as RUNNING their automations //
|
||||||
|
///////////////////////////////////////////////////
|
||||||
|
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(instance, session, table, records, isInsert ? AutomationStatus.RUNNING_INSERT_AUTOMATIONS : AutomationStatus.RUNNING_UPDATE_AUTOMATIONS);
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// foreach action - run it against the records (but only if they match the action's filter, if there is one) //
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
boolean anyActionsFailed = false;
|
||||||
for(TableAutomationAction action : actions)
|
for(TableAutomationAction action : actions)
|
||||||
{
|
{
|
||||||
QQueryFilter filter = action.getFilter();
|
try
|
||||||
if(filter == null)
|
|
||||||
{
|
{
|
||||||
filter = new QQueryFilter();
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// note - this method - will re-query the objects, so we should have confidence that their data is fresh... //
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
List<QRecord> matchingQRecords = getRecordsMatchingActionFilter(session, table, records, action);
|
||||||
|
LOG.debug("Of the {} records that were pending automations, {} of them match the filter on the action {}", records.size(), matchingQRecords.size(), action);
|
||||||
|
if(CollectionUtils.nullSafeHasContents(matchingQRecords))
|
||||||
|
{
|
||||||
|
LOG.debug(" Processing " + matchingQRecords.size() + " records in " + table + " for action " + action);
|
||||||
|
applyActionToMatchingRecords(session, table, matchingQRecords, action);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
catch(Exception e)
|
||||||
filter.addCriteria(new QFilterCriteria(table.getAutomationDetails().getStatusTracking().getFieldName(), QCriteriaOperator.IN, List.of(automationStatus.getId())));
|
|
||||||
queryInput.setFilter(filter);
|
|
||||||
|
|
||||||
QueryOutput queryOutput = new QueryAction().execute(queryInput);
|
|
||||||
|
|
||||||
// todo - pipe this query!!
|
|
||||||
|
|
||||||
if(CollectionUtils.nullSafeHasContents(queryOutput.getRecords()))
|
|
||||||
{
|
{
|
||||||
LOG.info(" Processing " + queryOutput.getRecords().size() + " records in " + table + " for action " + action);
|
LOG.warn("Caught exception processing records on " + table + " for action " + action, e);
|
||||||
processRecords(table, actions, queryOutput.getRecords(), session, isInsert);
|
anyActionsFailed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
////////////////////////////////////////
|
||||||
|
// update status on all these records //
|
||||||
|
////////////////////////////////////////
|
||||||
|
if(anyActionsFailed)
|
||||||
|
{
|
||||||
|
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(instance, session, table, records, AutomationStatus.FAILED_UPDATE_AUTOMATIONS);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(instance, session, table, records, AutomationStatus.OK);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
|
** For a given action, and a list of records - return a new list, of the ones
|
||||||
|
** which match the action's filter (if there is one - if not, then all match).
|
||||||
**
|
**
|
||||||
|
** Note that this WILL re-query the objects (ALWAYS - even if the action has no filter).
|
||||||
|
** This has the nice side effect of always giving fresh/updated records, despite having
|
||||||
|
** some cost.
|
||||||
|
**
|
||||||
|
** At one point, we considered just applying the filter using java-comparisons,
|
||||||
|
** but that will almost certainly give potentially different results than a true
|
||||||
|
** backend - e.g., just consider if the DB is case-sensitive for strings...
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
private void processRecords(QTableMetaData table, List<TableAutomationAction> actions, List<QRecord> records, QSession session, boolean isInsert) throws QException
|
private List<QRecord> getRecordsMatchingActionFilter(QSession session, QTableMetaData table, List<QRecord> records, TableAutomationAction action) throws QException
|
||||||
{
|
{
|
||||||
try
|
QueryInput queryInput = new QueryInput(instance);
|
||||||
{
|
queryInput.setSession(session);
|
||||||
updateRecordAutomationStatus(table, session, records, isInsert ? AutomationStatus.RUNNING_INSERT_AUTOMATIONS : AutomationStatus.RUNNING_UPDATE_AUTOMATIONS);
|
queryInput.setTableName(table.getName());
|
||||||
|
|
||||||
for(TableAutomationAction action : actions)
|
QQueryFilter filter = new QQueryFilter();
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// copy filter criteria from the action's filter to a new filter that we'll run here. //
|
||||||
|
// Critically - don't modify the filter object on the action! as that object has a long lifespan. //
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
if(action.getFilter() != null)
|
||||||
|
{
|
||||||
|
if(action.getFilter().getCriteria() != null)
|
||||||
{
|
{
|
||||||
////////////////////////////////////
|
action.getFilter().getCriteria().forEach(filter::addCriteria);
|
||||||
// todo - what, re-query them? :( //
|
|
||||||
////////////////////////////////////
|
|
||||||
if(StringUtils.hasContent(action.getProcessName()))
|
|
||||||
{
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////
|
|
||||||
// todo - uh, how to make these records the input, where an extract step might be involved? //
|
|
||||||
// should extract step ... see record list and just use it? i think maybe? //
|
|
||||||
//////////////////////////////////////////////////////////////////////////////////////////////
|
|
||||||
throw (new NotImplementedException("processes for automation not yet implemented"));
|
|
||||||
}
|
|
||||||
else if(action.getCodeReference() != null)
|
|
||||||
{
|
|
||||||
LOG.info(" Executing action: [" + action.getName() + "] as code reference: " + action.getCodeReference());
|
|
||||||
RecordAutomationInput input = new RecordAutomationInput(instance);
|
|
||||||
input.setSession(session);
|
|
||||||
input.setTableName(table.getName());
|
|
||||||
input.setRecordList(records);
|
|
||||||
|
|
||||||
RecordAutomationHandler recordAutomationHandler = QCodeLoader.getRecordAutomationHandler(action);
|
|
||||||
recordAutomationHandler.execute(input);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if(action.getFilter().getOrderBys() != null)
|
||||||
|
{
|
||||||
|
action.getFilter().getOrderBys().forEach(filter::addOrderBy);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
updateRecordAutomationStatus(table, session, records, AutomationStatus.OK);
|
filter.addCriteria(new QFilterCriteria(table.getPrimaryKeyField(), QCriteriaOperator.IN, records.stream().map(r -> r.getValue(table.getPrimaryKeyField())).toList()));
|
||||||
}
|
|
||||||
catch(Exception e)
|
/////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
{
|
// always add order-by the primary key, to give more predictable/consistent results //
|
||||||
updateRecordAutomationStatus(table, session, records, isInsert ? AutomationStatus.FAILED_INSERT_AUTOMATIONS : AutomationStatus.FAILED_UPDATE_AUTOMATIONS);
|
// todo - in future - if this becomes a source of slowness, make this a config to opt-out? //
|
||||||
}
|
/////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
filter.addOrderBy(new QFilterOrderBy().withFieldName(table.getPrimaryKeyField()));
|
||||||
|
|
||||||
|
queryInput.setFilter(filter);
|
||||||
|
|
||||||
|
return (new QueryAction().execute(queryInput).getRecords());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
** Finally, actually run action code against a list of known matching records.
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
private void updateRecordAutomationStatus(QTableMetaData table, QSession session, List<QRecord> records, AutomationStatus automationStatus) throws QException
|
private void applyActionToMatchingRecords(QSession session, QTableMetaData table, List<QRecord> records, TableAutomationAction action) throws Exception
|
||||||
{
|
{
|
||||||
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(instance, session, table, records, automationStatus);
|
if(StringUtils.hasContent(action.getProcessName()))
|
||||||
|
{
|
||||||
|
RunProcessInput runProcessInput = new RunProcessInput(instance);
|
||||||
|
runProcessInput.setSession(session);
|
||||||
|
runProcessInput.setProcessName(action.getProcessName());
|
||||||
|
runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// kinda hacky - if we see that this process has an input field of a given name, then put a filter in there to find these records... //
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
QProcessMetaData process = instance.getProcess(action.getProcessName());
|
||||||
|
if(process.getInputFields().stream().anyMatch(f -> f.getName().equals(StreamedETLWithFrontendProcess.FIELD_DEFAULT_QUERY_FILTER)))
|
||||||
|
{
|
||||||
|
List<Serializable> recordIds = records.stream().map(r -> r.getValueInteger(table.getPrimaryKeyField())).collect(Collectors.toList());
|
||||||
|
QQueryFilter queryFilter = new QQueryFilter().withCriteria(new QFilterCriteria(table.getPrimaryKeyField(), QCriteriaOperator.IN, recordIds));
|
||||||
|
runProcessInput.addValue(StreamedETLWithFrontendProcess.FIELD_DEFAULT_QUERY_FILTER, queryFilter);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
runProcessInput.setRecords(records);
|
||||||
|
}
|
||||||
|
|
||||||
|
RunProcessAction runProcessAction = new RunProcessAction();
|
||||||
|
RunProcessOutput runProcessOutput = runProcessAction.execute(runProcessInput);
|
||||||
|
if(runProcessOutput.getException().isPresent())
|
||||||
|
{
|
||||||
|
throw (runProcessOutput.getException().get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if(action.getCodeReference() != null)
|
||||||
|
{
|
||||||
|
LOG.debug(" Executing action: [" + action.getName() + "] as code reference: " + action.getCodeReference());
|
||||||
|
RecordAutomationInput input = new RecordAutomationInput(instance);
|
||||||
|
input.setSession(session);
|
||||||
|
input.setTableName(table.getName());
|
||||||
|
input.setRecordList(records);
|
||||||
|
|
||||||
|
RecordAutomationHandler recordAutomationHandler = QCodeLoader.getRecordAutomationHandler(action);
|
||||||
|
recordAutomationHandler.execute(input);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -118,7 +118,7 @@ public class RecordPipe
|
|||||||
LOG.warn("Giving up adding record to pipe, due to pipe being full for more than {} millis", MAX_SLEEP_LOOP_MILLIS);
|
LOG.warn("Giving up adding record to pipe, due to pipe being full for more than {} millis", MAX_SLEEP_LOOP_MILLIS);
|
||||||
throw (new IllegalStateException("Giving up adding record to pipe, due to pipe staying full too long."));
|
throw (new IllegalStateException("Giving up adding record to pipe, due to pipe staying full too long."));
|
||||||
}
|
}
|
||||||
LOG.debug("Record pipe.add failed (due to full pipe). Blocking.");
|
LOG.trace("Record pipe.add failed (due to full pipe). Blocking.");
|
||||||
SleepUtils.sleep(BLOCKING_SLEEP_MILLIS, TimeUnit.MILLISECONDS);
|
SleepUtils.sleep(BLOCKING_SLEEP_MILLIS, TimeUnit.MILLISECONDS);
|
||||||
offerResult = queue.offer(record);
|
offerResult = queue.offer(record);
|
||||||
now = System.currentTimeMillis();
|
now = System.currentTimeMillis();
|
||||||
|
@ -104,6 +104,17 @@ public class QRecord implements Serializable
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return "QRecord{tableName='" + tableName + "',id='" + getValue("id") + "'}";
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
|
@ -24,8 +24,10 @@ package com.kingsrook.qqq.backend.core.model.metadata.processes;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.layout.QAppChildMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.layout.QAppChildMetaData;
|
||||||
@ -273,17 +275,25 @@ public class QProcessMetaData implements QAppChildMetaData
|
|||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Get a list of all of the input fields used by all the steps in this process.
|
** Get a list of all the *unique* input fields used by all the steps in this process.
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
public List<QFieldMetaData> getInputFields()
|
public List<QFieldMetaData> getInputFields()
|
||||||
{
|
{
|
||||||
List<QFieldMetaData> rs = new ArrayList<>();
|
Set<String> usedFieldNames = new HashSet<>();
|
||||||
|
List<QFieldMetaData> rs = new ArrayList<>();
|
||||||
if(steps != null)
|
if(steps != null)
|
||||||
{
|
{
|
||||||
for(QStepMetaData step : steps.values())
|
for(QStepMetaData step : steps.values())
|
||||||
{
|
{
|
||||||
rs.addAll(step.getInputFields());
|
for(QFieldMetaData field : step.getInputFields())
|
||||||
|
{
|
||||||
|
if(!usedFieldNames.contains(field.getName()))
|
||||||
|
{
|
||||||
|
rs.add(field);
|
||||||
|
usedFieldNames.add(field.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return (rs);
|
return (rs);
|
||||||
@ -292,17 +302,25 @@ public class QProcessMetaData implements QAppChildMetaData
|
|||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Get a list of all of the output fields used by all the steps in this process.
|
** Get a list of all the *unique* output fields used by all the steps in this process.
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
@JsonIgnore
|
@JsonIgnore
|
||||||
public List<QFieldMetaData> getOutputFields()
|
public List<QFieldMetaData> getOutputFields()
|
||||||
{
|
{
|
||||||
List<QFieldMetaData> rs = new ArrayList<>();
|
Set<String> usedFieldNames = new HashSet<>();
|
||||||
|
List<QFieldMetaData> rs = new ArrayList<>();
|
||||||
if(steps != null)
|
if(steps != null)
|
||||||
{
|
{
|
||||||
for(QStepMetaData step : steps.values())
|
for(QStepMetaData step : steps.values())
|
||||||
{
|
{
|
||||||
rs.addAll(step.getOutputFields());
|
for(QFieldMetaData field : step.getOutputFields())
|
||||||
|
{
|
||||||
|
if(!usedFieldNames.contains(field.getName()))
|
||||||
|
{
|
||||||
|
rs.add(field);
|
||||||
|
usedFieldNames.add(field.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return (rs);
|
return (rs);
|
||||||
|
@ -35,6 +35,10 @@ import com.kingsrook.qqq.backend.core.modules.backend.QBackendModuleInterface;
|
|||||||
** A simple (probably only valid for testing?) implementation of the QModuleInterface,
|
** A simple (probably only valid for testing?) implementation of the QModuleInterface,
|
||||||
** that just stores its records in-memory.
|
** that just stores its records in-memory.
|
||||||
**
|
**
|
||||||
|
** In general, this class is intended to behave, as much as possible, like an RDBMS.
|
||||||
|
**
|
||||||
|
** TODO - in future, if we need to - make configs for things like "case-insensitive",
|
||||||
|
** and "allow loose typing".
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public class MemoryBackendModule implements QBackendModuleInterface
|
public class MemoryBackendModule implements QBackendModuleInterface
|
||||||
{
|
{
|
||||||
|
@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.modules.backend.implementations.memory;
|
|||||||
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.time.LocalDate;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
@ -32,12 +33,14 @@ import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountInput;
|
|||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType;
|
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||||
|
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
||||||
import org.apache.commons.lang.NotImplementedException;
|
import org.apache.commons.lang.NotImplementedException;
|
||||||
|
|
||||||
|
|
||||||
@ -121,42 +124,7 @@ public class MemoryRecordStore
|
|||||||
|
|
||||||
for(QRecord qRecord : tableData.values())
|
for(QRecord qRecord : tableData.values())
|
||||||
{
|
{
|
||||||
boolean recordMatches = true;
|
boolean recordMatches = doesRecordMatch(input.getFilter(), qRecord);
|
||||||
if(input.getFilter() != null && input.getFilter().getCriteria() != null)
|
|
||||||
{
|
|
||||||
for(QFilterCriteria criterion : input.getFilter().getCriteria())
|
|
||||||
{
|
|
||||||
String fieldName = criterion.getFieldName();
|
|
||||||
Serializable value = qRecord.getValue(fieldName);
|
|
||||||
switch(criterion.getOperator())
|
|
||||||
{
|
|
||||||
case EQUALS:
|
|
||||||
{
|
|
||||||
if(!value.equals(criterion.getValues().get(0)))
|
|
||||||
{
|
|
||||||
recordMatches = false;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case IN:
|
|
||||||
{
|
|
||||||
if(!criterion.getValues().contains(value))
|
|
||||||
{
|
|
||||||
recordMatches = false;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
{
|
|
||||||
throw new NotImplementedException("Operator [" + criterion.getOperator() + "] is not yet implemented in the Memory backend.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if(!recordMatches)
|
|
||||||
{
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(recordMatches)
|
if(recordMatches)
|
||||||
{
|
{
|
||||||
@ -169,6 +137,217 @@ public class MemoryRecordStore
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private boolean doesRecordMatch(QQueryFilter filter, QRecord qRecord)
|
||||||
|
{
|
||||||
|
boolean recordMatches = true;
|
||||||
|
if(filter != null && filter.getCriteria() != null)
|
||||||
|
{
|
||||||
|
for(QFilterCriteria criterion : filter.getCriteria())
|
||||||
|
{
|
||||||
|
String fieldName = criterion.getFieldName();
|
||||||
|
Serializable value = qRecord.getValue(fieldName);
|
||||||
|
|
||||||
|
switch(criterion.getOperator())
|
||||||
|
{
|
||||||
|
case EQUALS:
|
||||||
|
{
|
||||||
|
recordMatches = testEquals(criterion, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case NOT_EQUALS:
|
||||||
|
{
|
||||||
|
recordMatches = !testEquals(criterion, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case IN:
|
||||||
|
{
|
||||||
|
recordMatches = testIn(criterion, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case NOT_IN:
|
||||||
|
{
|
||||||
|
recordMatches = !testIn(criterion, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case CONTAINS:
|
||||||
|
{
|
||||||
|
recordMatches = testContains(criterion, fieldName, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case NOT_CONTAINS:
|
||||||
|
{
|
||||||
|
recordMatches = !testContains(criterion, fieldName, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case GREATER_THAN:
|
||||||
|
{
|
||||||
|
recordMatches = testGreaterThan(criterion, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case GREATER_THAN_OR_EQUALS:
|
||||||
|
{
|
||||||
|
recordMatches = testGreaterThan(criterion, value) || testEquals(criterion, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case LESS_THAN:
|
||||||
|
{
|
||||||
|
recordMatches = !testGreaterThan(criterion, value) && !testEquals(criterion, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case LESS_THAN_OR_EQUALS:
|
||||||
|
{
|
||||||
|
recordMatches = !testGreaterThan(criterion, value);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
throw new NotImplementedException("Operator [" + criterion.getOperator() + "] is not yet implemented in the Memory backend.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if(!recordMatches)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return recordMatches;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private boolean testGreaterThan(QFilterCriteria criterion, Serializable value)
|
||||||
|
{
|
||||||
|
Serializable criterionValue = criterion.getValues().get(0);
|
||||||
|
if(criterionValue == null)
|
||||||
|
{
|
||||||
|
throw (new IllegalArgumentException("Missing criterion value in query"));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (value == null)
|
||||||
|
{
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// a database would say 'false' for if a null column is > a value, so do the same. //
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(value instanceof LocalDate valueDate && criterionValue instanceof LocalDate criterionValueDate)
|
||||||
|
{
|
||||||
|
return (valueDate.isAfter(criterionValueDate));
|
||||||
|
}
|
||||||
|
|
||||||
|
if(value instanceof Number valueNumber && criterionValue instanceof Number criterionValueNumber)
|
||||||
|
{
|
||||||
|
return (valueNumber.doubleValue() > criterionValueNumber.doubleValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
throw (new NotImplementedException("Greater/Less Than comparisons are not (yet?) implemented for the supplied types [" + value.getClass().getSimpleName() + "][" + criterionValue.getClass().getSimpleName() + "]"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private boolean testIn(QFilterCriteria criterion, Serializable value)
|
||||||
|
{
|
||||||
|
if(!criterion.getValues().contains(value))
|
||||||
|
{
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
return (true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private boolean testEquals(QFilterCriteria criterion, Serializable value)
|
||||||
|
{
|
||||||
|
if(value == null)
|
||||||
|
{
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!value.equals(criterion.getValues().get(0)))
|
||||||
|
{
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
return (true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private boolean testContains(QFilterCriteria criterion, String fieldName, Serializable value)
|
||||||
|
{
|
||||||
|
String stringValue = getStringFieldValue(value, fieldName, criterion);
|
||||||
|
String criterionValue = getFirstStringCriterionValue(criterion);
|
||||||
|
|
||||||
|
if(!stringValue.contains(criterionValue))
|
||||||
|
{
|
||||||
|
return (false);
|
||||||
|
}
|
||||||
|
|
||||||
|
return (true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private String getFirstStringCriterionValue(QFilterCriteria criteria)
|
||||||
|
{
|
||||||
|
if(CollectionUtils.nullSafeIsEmpty(criteria.getValues()))
|
||||||
|
{
|
||||||
|
throw new IllegalArgumentException("Missing value for [" + criteria.getOperator() + "] criteria on field [" + criteria.getFieldName() + "]");
|
||||||
|
}
|
||||||
|
Serializable value = criteria.getValues().get(0);
|
||||||
|
if(value == null)
|
||||||
|
{
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!(value instanceof String stringValue))
|
||||||
|
{
|
||||||
|
throw new ClassCastException("Value [" + value + "] for criteria [" + criteria.getFieldName() + "] is not a String, which is required for the [" + criteria.getOperator() + "] operator.");
|
||||||
|
}
|
||||||
|
|
||||||
|
return (stringValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private String getStringFieldValue(Serializable value, String fieldName, QFilterCriteria criterion)
|
||||||
|
{
|
||||||
|
if(value == null)
|
||||||
|
{
|
||||||
|
return "";
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!(value instanceof String stringValue))
|
||||||
|
{
|
||||||
|
throw new ClassCastException("Value [" + value + "] in field [" + fieldName + "] is not a String, which is required for the [" + criterion.getOperator() + "] operator.");
|
||||||
|
}
|
||||||
|
|
||||||
|
return (stringValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
|
@ -65,6 +65,15 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////
|
||||||
|
// if we're running inside an automation, then skip this step. //
|
||||||
|
/////////////////////////////////////////////////////////////////
|
||||||
|
if(runningWithinAutomation())
|
||||||
|
{
|
||||||
|
LOG.info("Skipping preview step when [" + runBackendStepInput.getProcessName() + "] is running as part of an automation.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////
|
///////////////////////////////////////////
|
||||||
// request a count from the extract step //
|
// request a count from the extract step //
|
||||||
///////////////////////////////////////////
|
///////////////////////////////////////////
|
||||||
@ -109,6 +118,25 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private boolean runningWithinAutomation()
|
||||||
|
{
|
||||||
|
Exception e = new Exception();
|
||||||
|
for(StackTraceElement stackTraceElement : e.getStackTrace())
|
||||||
|
{
|
||||||
|
String className = stackTraceElement.getClassName();
|
||||||
|
if(className.contains("com.kingsrook.qqq.backend.core.actions.automation"))
|
||||||
|
{
|
||||||
|
return (true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
|
@ -2,11 +2,11 @@
|
|||||||
<Configuration>
|
<Configuration>
|
||||||
<Appenders>
|
<Appenders>
|
||||||
<Console name="SystemOutAppender" target="SYSTEM_OUT">
|
<Console name="SystemOutAppender" target="SYSTEM_OUT">
|
||||||
<LevelRangeFilter minLevel="ERROR" maxLevel="all" onMatch="ACCEPT" onMismatch="DENY"/>
|
<LevelRangeFilter minLevel="ERROR" maxLevel="DEBUG" onMatch="ACCEPT" onMismatch="DENY"/>
|
||||||
<PatternLayout pattern="%highlight{%date{ISO8601} | %relative | %level | %threadName | %logger{1} | %message%n}"/>
|
<PatternLayout pattern="%highlight{%date{ISO8601} | %relative | %level | %threadName | %logger{1} | %message%n}"/>
|
||||||
</Console>
|
</Console>
|
||||||
<Syslog name="SyslogAppender" format="RFC5424" host="localhost" port="514" protocol="UDP" appName="qqq" facility="LOCAL0">
|
<Syslog name="SyslogAppender" format="RFC5424" host="localhost" port="514" protocol="UDP" appName="qqq" facility="LOCAL0">
|
||||||
<LevelRangeFilter minLevel="ERROR" maxLevel="all" onMatch="ACCEPT" onMismatch="DENY"/>
|
<LevelRangeFilter minLevel="ERROR" maxLevel="INFO" onMatch="ACCEPT" onMismatch="DENY"/>
|
||||||
<PatternLayout pattern="%date{ISO8601} | %relative | %level | %threadName | %logger{1} | %message%n"/>
|
<PatternLayout pattern="%date{ISO8601} | %relative | %level | %threadName | %logger{1} | %message%n"/>
|
||||||
</Syslog>
|
</Syslog>
|
||||||
<File name="LogFileAppender" fileName="log/qqq.log">
|
<File name="LogFileAppender" fileName="log/qqq.log">
|
||||||
|
@ -11,12 +11,10 @@ import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
|
|||||||
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationHandler;
|
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationHandler;
|
||||||
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
|
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
|
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
|
|
||||||
import com.kingsrook.qqq.backend.core.model.automation.RecordAutomationInput;
|
import com.kingsrook.qqq.backend.core.model.automation.RecordAutomationInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
||||||
@ -99,56 +97,6 @@ class PollingAutomationExecutorTest
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
@Test
|
|
||||||
void testUpdate() throws QException
|
|
||||||
{
|
|
||||||
QInstance qInstance = TestUtils.defineInstance();
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
|
||||||
// insert 2 people - one who should be logged by logger-on-update automation //
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
|
||||||
InsertInput insertInput = new InsertInput(qInstance);
|
|
||||||
insertInput.setSession(new QSession());
|
|
||||||
insertInput.setTableName(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
|
||||||
insertInput.setRecords(List.of(
|
|
||||||
new QRecord().withValue("id", 1).withValue("firstName", "Tim"),
|
|
||||||
new QRecord().withValue("id", 2).withValue("firstName", "Darin")
|
|
||||||
));
|
|
||||||
new InsertAction().execute(insertInput);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////
|
|
||||||
// have the polling executor run "for awhile" //
|
|
||||||
////////////////////////////////////////////////
|
|
||||||
runPollingAutomationExecutorForAwhile(qInstance);
|
|
||||||
|
|
||||||
//////////////////////////////////////////////////
|
|
||||||
// assert that the update-automation didn't run //
|
|
||||||
//////////////////////////////////////////////////
|
|
||||||
assertThat(TestUtils.LogPersonUpdate.updatedIds).isNullOrEmpty();
|
|
||||||
|
|
||||||
UpdateInput updateInput = new UpdateInput(qInstance);
|
|
||||||
updateInput.setSession(new QSession());
|
|
||||||
updateInput.setTableName(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
|
||||||
updateInput.setRecords(List.of(
|
|
||||||
new QRecord().withValue("id", 1).withValue("lastName", "now with a LastName"),
|
|
||||||
new QRecord().withValue("id", 2).withValue("lastName", "now with a LastName")
|
|
||||||
));
|
|
||||||
new UpdateAction().execute(updateInput);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////
|
|
||||||
// have the polling executor run "for awhile" //
|
|
||||||
////////////////////////////////////////////////
|
|
||||||
runPollingAutomationExecutorForAwhile(qInstance);
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////
|
|
||||||
// assert that the update-automation DID run now //
|
|
||||||
///////////////////////////////////////////////////
|
|
||||||
assertThat(TestUtils.LogPersonUpdate.updatedIds).contains(2);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -166,14 +114,14 @@ class PollingAutomationExecutorTest
|
|||||||
.getAutomationDetails().getActions().get(0)
|
.getAutomationDetails().getActions().get(0)
|
||||||
.setCodeReference(new QCodeReference(CaptureSessionIdAutomationHandler.class));
|
.setCodeReference(new QCodeReference(CaptureSessionIdAutomationHandler.class));
|
||||||
|
|
||||||
/////////////////////
|
////////////////////////////////////////////////////////////
|
||||||
// insert a person //
|
// insert a person that will trigger the on-insert action //
|
||||||
/////////////////////
|
////////////////////////////////////////////////////////////
|
||||||
InsertInput insertInput = new InsertInput(qInstance);
|
InsertInput insertInput = new InsertInput(qInstance);
|
||||||
insertInput.setSession(new QSession());
|
insertInput.setSession(new QSession());
|
||||||
insertInput.setTableName(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
insertInput.setTableName(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
||||||
insertInput.setRecords(List.of(
|
insertInput.setRecords(List.of(
|
||||||
new QRecord().withValue("id", 1).withValue("firstName", "Tim")
|
new QRecord().withValue("id", 1).withValue("firstName", "Tim").withValue("birthDate", LocalDate.now())
|
||||||
));
|
));
|
||||||
new InsertAction().execute(insertInput);
|
new InsertAction().execute(insertInput);
|
||||||
|
|
||||||
|
@ -0,0 +1,226 @@
|
|||||||
|
package com.kingsrook.qqq.backend.core.actions.automation.polling;
|
||||||
|
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.Month;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Optional;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
|
||||||
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||||
|
import com.kingsrook.qqq.backend.core.modules.backend.implementations.memory.MemoryRecordStore;
|
||||||
|
import com.kingsrook.qqq.backend.core.utils.TestUtils;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Unit test for PollingAutomationRunner
|
||||||
|
*******************************************************************************/
|
||||||
|
class PollingAutomationRunnerTest
|
||||||
|
{
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@BeforeEach
|
||||||
|
@AfterEach
|
||||||
|
void beforeAndAfterEach()
|
||||||
|
{
|
||||||
|
MemoryRecordStore.getInstance().reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Test a cycle that does an insert, some automations, then and an update, and more automations.
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
void testInsertAndUpdate() throws QException
|
||||||
|
{
|
||||||
|
QInstance qInstance = TestUtils.defineInstance();
|
||||||
|
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(qInstance, TestUtils.POLLING_AUTOMATION, null);
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// insert 2 person records, one who should be both updated by the insert action, and should be logged by logger-on-update automation //
|
||||||
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
InsertInput insertInput = new InsertInput(qInstance);
|
||||||
|
insertInput.setSession(new QSession());
|
||||||
|
insertInput.setTableName(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
||||||
|
insertInput.setRecords(List.of(
|
||||||
|
new QRecord().withValue("id", 1).withValue("firstName", "Tim").withValue("birthDate", LocalDate.now()),
|
||||||
|
new QRecord().withValue("id", 2).withValue("firstName", "Darin")
|
||||||
|
));
|
||||||
|
new InsertAction().execute(insertInput);
|
||||||
|
assertAllRecordsAutomationStatus(AutomationStatus.PENDING_INSERT_AUTOMATIONS);
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// assert that the update-automation won't run - as no UPDATE has happened on the table //
|
||||||
|
// even though the insert action does update the records!! //
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
pollingAutomationRunner.run();
|
||||||
|
assertThat(TestUtils.LogPersonUpdate.updatedIds).isNullOrEmpty();
|
||||||
|
assertAllRecordsAutomationStatus(AutomationStatus.OK);
|
||||||
|
|
||||||
|
////////////////////////////////////////////
|
||||||
|
// make sure the minor person was updated //
|
||||||
|
////////////////////////////////////////////
|
||||||
|
Optional<QRecord> updatedMinorRecord = TestUtils.queryTable(TestUtils.TABLE_NAME_PERSON_MEMORY).stream().filter(r -> r.getValueInteger("id").equals(1)).findFirst();
|
||||||
|
assertThat(updatedMinorRecord)
|
||||||
|
.isPresent()
|
||||||
|
.get()
|
||||||
|
.extracting(r -> r.getValueString("firstName"))
|
||||||
|
.isEqualTo("Tim" + TestUtils.CheckAge.SUFFIX_FOR_MINORS);
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// run automations again - make sure that there haven't been any updates triggered yet //
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
pollingAutomationRunner.run();
|
||||||
|
assertThat(TestUtils.LogPersonUpdate.updatedIds).isNullOrEmpty();
|
||||||
|
assertAllRecordsAutomationStatus(AutomationStatus.OK);
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// now do an user-driven update - this SHOULD trigger the update automation next time we run automations. //
|
||||||
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
UpdateInput updateInput = new UpdateInput(qInstance);
|
||||||
|
updateInput.setSession(new QSession());
|
||||||
|
updateInput.setTableName(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
||||||
|
updateInput.setRecords(List.of(
|
||||||
|
new QRecord().withValue("id", 1).withValue("lastName", "now with a LastName"),
|
||||||
|
new QRecord().withValue("id", 2).withValue("lastName", "now with a LastName")
|
||||||
|
));
|
||||||
|
new UpdateAction().execute(updateInput);
|
||||||
|
assertAllRecordsAutomationStatus(AutomationStatus.PENDING_UPDATE_AUTOMATIONS);
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// assert that the update-automation DOES run now - and that it only runs once //
|
||||||
|
// note that it will only run on a sub-set of the records //
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////
|
||||||
|
pollingAutomationRunner.run();
|
||||||
|
assertThat(TestUtils.LogPersonUpdate.updatedIds)
|
||||||
|
.contains(2)
|
||||||
|
.hasSize(1);
|
||||||
|
assertAllRecordsAutomationStatus(AutomationStatus.OK);
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////
|
||||||
|
// re-run and assert no further automations happen //
|
||||||
|
/////////////////////////////////////////////////////
|
||||||
|
TestUtils.LogPersonUpdate.updatedIds.clear();
|
||||||
|
pollingAutomationRunner.run();
|
||||||
|
assertThat(TestUtils.LogPersonUpdate.updatedIds).isNullOrEmpty();
|
||||||
|
assertAllRecordsAutomationStatus(AutomationStatus.OK);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Test a large-ish number - to demonstrate paging working.
|
||||||
|
**
|
||||||
|
** Note - this caught an issue during original development, where the QueryFilter
|
||||||
|
** attached to the Action was being re-used, w/ new "id IN *" criteria being re-added
|
||||||
|
** to it - so, good test.
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
void testMultiPages() throws QException
|
||||||
|
{
|
||||||
|
QInstance qInstance = TestUtils.defineInstance();
|
||||||
|
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(qInstance, TestUtils.POLLING_AUTOMATION, null);
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// insert many people - half who should be updated by the AgeChecker automation //
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////
|
||||||
|
InsertInput insertInput = new InsertInput(qInstance);
|
||||||
|
insertInput.setSession(new QSession());
|
||||||
|
insertInput.setTableName(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
||||||
|
|
||||||
|
insertInput.setRecords(new ArrayList<>());
|
||||||
|
int SIZE = 2_500;
|
||||||
|
for(int i = 0; i < SIZE; i++)
|
||||||
|
{
|
||||||
|
insertInput.getRecords().add(new QRecord().withValue("firstName", "Tim").withValue("lastName", "Number " + i).withValue("birthDate", LocalDate.now()));
|
||||||
|
insertInput.getRecords().add(new QRecord().withValue("firstName", "Darin").withValue("lastName", "Number " + i));
|
||||||
|
}
|
||||||
|
|
||||||
|
new InsertAction().execute(insertInput);
|
||||||
|
assertAllRecordsAutomationStatus(AutomationStatus.PENDING_INSERT_AUTOMATIONS);
|
||||||
|
|
||||||
|
/////////////////////////
|
||||||
|
// run the automations //
|
||||||
|
/////////////////////////
|
||||||
|
pollingAutomationRunner.run();
|
||||||
|
assertAllRecordsAutomationStatus(AutomationStatus.OK);
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////
|
||||||
|
// make sure that all 'minor' persons were updated (e.g., all the Tim's) //
|
||||||
|
///////////////////////////////////////////////////////////////////////////
|
||||||
|
int updatedMinorsCount = 0;
|
||||||
|
for(QRecord qRecord : TestUtils.queryTable(TestUtils.TABLE_NAME_PERSON_MEMORY))
|
||||||
|
{
|
||||||
|
if(qRecord.getValueString("firstName").startsWith("Tim"))
|
||||||
|
{
|
||||||
|
assertEquals("Tim" + TestUtils.CheckAge.SUFFIX_FOR_MINORS, qRecord.getValueString("firstName"));
|
||||||
|
updatedMinorsCount++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(SIZE, updatedMinorsCount, "Expected number of updated records");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Test a cycle that does an insert, some automations, then and an update, and more automations.
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
void testRunningProcess() throws QException
|
||||||
|
{
|
||||||
|
QInstance qInstance = TestUtils.defineInstance();
|
||||||
|
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(qInstance, TestUtils.POLLING_AUTOMATION, null);
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// insert 2 person records, one who should be both updated by the insert action, and should be logged by logger-on-update automation //
|
||||||
|
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
InsertInput insertInput = new InsertInput(qInstance);
|
||||||
|
insertInput.setSession(new QSession());
|
||||||
|
insertInput.setTableName(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
||||||
|
insertInput.setRecords(List.of(
|
||||||
|
new QRecord().withValue("id", 1).withValue("firstName", "Tim").withValue("birthDate", LocalDate.of(1886, Month.JUNE, 6)),
|
||||||
|
new QRecord().withValue("id", 2).withValue("firstName", "Darin").withValue("birthDate", LocalDate.of(1904, Month.APRIL, 4))
|
||||||
|
));
|
||||||
|
new InsertAction().execute(insertInput);
|
||||||
|
|
||||||
|
pollingAutomationRunner.run();
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// make sure the process ran - which means, it would have updated Tim's birth year to 1900 //
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
Optional<QRecord> updatedMinorRecord = TestUtils.queryTable(TestUtils.TABLE_NAME_PERSON_MEMORY).stream().filter(r -> r.getValueInteger("id").equals(1)).findFirst();
|
||||||
|
assertThat(updatedMinorRecord)
|
||||||
|
.isPresent()
|
||||||
|
.get()
|
||||||
|
.extracting(r -> r.getValueLocalDate("birthDate").getYear())
|
||||||
|
.isEqualTo(1900);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private void assertAllRecordsAutomationStatus(AutomationStatus pendingInsertAutomations) throws QException
|
||||||
|
{
|
||||||
|
assertThat(TestUtils.queryTable(TestUtils.TABLE_NAME_PERSON_MEMORY))
|
||||||
|
.isNotEmpty()
|
||||||
|
.allMatch(r -> pendingInsertAutomations.getId().equals(r.getValue(TestUtils.standardQqqAutomationStatusField().getName())));
|
||||||
|
}
|
||||||
|
}
|
@ -81,7 +81,7 @@ class ReportActionTest
|
|||||||
public void testBigger() throws Exception
|
public void testBigger() throws Exception
|
||||||
{
|
{
|
||||||
// int recordCount = 2_000_000; // to really stress locally, use this.
|
// int recordCount = 2_000_000; // to really stress locally, use this.
|
||||||
int recordCount = 50_000;
|
int recordCount = 10_000;
|
||||||
String filename = "/tmp/ReportActionTest.csv";
|
String filename = "/tmp/ReportActionTest.csv";
|
||||||
|
|
||||||
runReport(recordCount, filename, ReportFormat.CSV, false);
|
runReport(recordCount, filename, ReportFormat.CSV, false);
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
package com.kingsrook.qqq.backend.core.modules.backend.implementations.memory;
|
package com.kingsrook.qqq.backend.core.modules.backend.implementations.memory;
|
||||||
|
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
|
import java.time.Month;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizers;
|
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizers;
|
||||||
@ -54,6 +56,7 @@ import org.junit.jupiter.api.AfterEach;
|
|||||||
import org.junit.jupiter.api.BeforeEach;
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
|
||||||
@ -193,6 +196,101 @@ class MemoryBackendModuleTest
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
void testQueryOperators() throws QException
|
||||||
|
{
|
||||||
|
QInstance qInstance = TestUtils.defineInstance();
|
||||||
|
QTableMetaData table = qInstance.getTable(TestUtils.TABLE_NAME_SHAPE);
|
||||||
|
QSession session = new QSession();
|
||||||
|
|
||||||
|
InsertInput insertInput = new InsertInput(qInstance);
|
||||||
|
insertInput.setSession(session);
|
||||||
|
insertInput.setTableName(table.getName());
|
||||||
|
insertInput.setRecords(List.of(
|
||||||
|
new QRecord().withValue("id", 1).withValue("name", "Square").withValue("date", LocalDate.of(1980, Month.MAY, 31)),
|
||||||
|
new QRecord().withValue("id", 2).withValue("name", "Triangle").withValue("date", LocalDate.of(1999, Month.DECEMBER, 31))
|
||||||
|
));
|
||||||
|
new InsertAction().execute(insertInput);
|
||||||
|
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.IN, List.of(1, 2))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.IN, List.of(2, 3))).size());
|
||||||
|
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.NOT_IN, List.of(3, 4))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.NOT_IN, List.of(2, 3))).size());
|
||||||
|
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.EQUALS, List.of("Square"))).size());
|
||||||
|
assertEquals("Square", queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.EQUALS, List.of("Square"))).get(0).getValue("name"));
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("notAFieldSoNull", QCriteriaOperator.EQUALS, List.of("Square"))).size());
|
||||||
|
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.NOT_EQUALS, List.of("notFound"))).size());
|
||||||
|
assertEquals("Square", queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.NOT_EQUALS, List.of("Triangle"))).get(0).getValue("name"));
|
||||||
|
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.CONTAINS, List.of("ria"))).size());
|
||||||
|
assertEquals("Triangle", queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.CONTAINS, List.of("ria"))).get(0).getValue("name"));
|
||||||
|
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.NOT_CONTAINS, List.of("notFound"))).size());
|
||||||
|
assertEquals("Square", queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.NOT_CONTAINS, List.of("ria"))).get(0).getValue("name"));
|
||||||
|
|
||||||
|
assertThrows(QException.class, () -> queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.CONTAINS, List.of("ria"))));
|
||||||
|
assertThrows(QException.class, () -> queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.CONTAINS, List.of(1))));
|
||||||
|
assertThrows(QException.class, () -> queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.CONTAINS, List.of())));
|
||||||
|
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.GREATER_THAN, List.of(LocalDate.of(2022, Month.SEPTEMBER, 1)))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.GREATER_THAN, List.of(LocalDate.of(1990, Month.JANUARY, 1)))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.GREATER_THAN, List.of(LocalDate.of(1970, Month.JANUARY, 1)))).size());
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN, List.of(2))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN, List.of(1))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN, List.of(0))).size());
|
||||||
|
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of(LocalDate.of(2022, Month.SEPTEMBER, 1)))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of(LocalDate.of(1990, Month.JANUARY, 1)))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of(LocalDate.of(1970, Month.JANUARY, 1)))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of(LocalDate.of(1980, Month.MAY, 31)))).size());
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of(3))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of(2))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of(1))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of(0))).size());
|
||||||
|
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.LESS_THAN, List.of(LocalDate.of(2022, Month.SEPTEMBER, 1)))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.LESS_THAN, List.of(LocalDate.of(1990, Month.JANUARY, 1)))).size());
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.LESS_THAN, List.of(LocalDate.of(1970, Month.JANUARY, 1)))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN, List.of(3))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN, List.of(2))).size());
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN, List.of(1))).size());
|
||||||
|
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of(LocalDate.of(2022, Month.SEPTEMBER, 1)))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of(LocalDate.of(1990, Month.JANUARY, 1)))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of(LocalDate.of(1980, Month.MAY, 31)))).size());
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("date", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of(LocalDate.of(1970, Month.JANUARY, 1)))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of(3))).size());
|
||||||
|
assertEquals(2, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of(2))).size());
|
||||||
|
assertEquals(1, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of(1))).size());
|
||||||
|
assertEquals(0, queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of(0))).size());
|
||||||
|
|
||||||
|
assertThrows(QException.class, () -> queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN, List.of())));
|
||||||
|
assertThrows(QException.class, () -> queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.GREATER_THAN_OR_EQUALS, List.of())));
|
||||||
|
assertThrows(QException.class, () -> queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN, List.of())));
|
||||||
|
assertThrows(QException.class, () -> queryShapes(qInstance, table, session, new QFilterCriteria("id", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of())));
|
||||||
|
assertThrows(QException.class, () -> queryShapes(qInstance, table, session, new QFilterCriteria("name", QCriteriaOperator.LESS_THAN_OR_EQUALS, List.of("Bob"))));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
private List<QRecord> queryShapes(QInstance qInstance, QTableMetaData table, QSession session, QFilterCriteria criteria) throws QException
|
||||||
|
{
|
||||||
|
QueryInput queryInput = new QueryInput(qInstance);
|
||||||
|
queryInput.setSession(session);
|
||||||
|
queryInput.setTableName(table.getName());
|
||||||
|
queryInput.setFilter(new QQueryFilter().withCriteria(criteria));
|
||||||
|
QueryOutput queryOutput = new QueryAction().execute(queryInput);
|
||||||
|
return queryOutput.getRecords();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
|
@ -24,10 +24,12 @@ package com.kingsrook.qqq.backend.core.utils;
|
|||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
|
import java.time.Month;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
|
import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
|
||||||
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationHandler;
|
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationHandler;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.person.addtopeoplesage.AddAge;
|
import com.kingsrook.qqq.backend.core.actions.processes.person.addtopeoplesage.AddAge;
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.person.addtopeoplesage.GetAgeStatistics;
|
import com.kingsrook.qqq.backend.core.actions.processes.person.addtopeoplesage.GetAgeStatistics;
|
||||||
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
|
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
|
||||||
@ -35,6 +37,8 @@ import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
|||||||
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
|
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.values.QCustomPossibleValueProvider;
|
import com.kingsrook.qqq.backend.core.actions.values.QCustomPossibleValueProvider;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
||||||
@ -65,10 +69,10 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QFunctionInputMet
|
|||||||
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFunctionOutputMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFunctionOutputMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.processes.QRecordListMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.processes.QRecordListMetaData;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTracking;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTracking;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTrackingType;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTrackingType;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TableAutomationAction;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TableAutomationAction;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TriggerEvent;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TriggerEvent;
|
||||||
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||||
@ -103,6 +107,7 @@ public class TestUtils
|
|||||||
|
|
||||||
public static final String PROCESS_NAME_GREET_PEOPLE = "greet";
|
public static final String PROCESS_NAME_GREET_PEOPLE = "greet";
|
||||||
public static final String PROCESS_NAME_GREET_PEOPLE_INTERACTIVE = "greetInteractive";
|
public static final String PROCESS_NAME_GREET_PEOPLE_INTERACTIVE = "greetInteractive";
|
||||||
|
public static final String PROCESS_NAME_INCREASE_BIRTHDATE = "increaseBirthdate";
|
||||||
public static final String PROCESS_NAME_ADD_TO_PEOPLES_AGE = "addToPeoplesAge";
|
public static final String PROCESS_NAME_ADD_TO_PEOPLES_AGE = "addToPeoplesAge";
|
||||||
public static final String TABLE_NAME_PERSON_FILE = "personFile";
|
public static final String TABLE_NAME_PERSON_FILE = "personFile";
|
||||||
public static final String TABLE_NAME_PERSON_MEMORY = "personMemory";
|
public static final String TABLE_NAME_PERSON_MEMORY = "personMemory";
|
||||||
@ -144,6 +149,7 @@ public class TestUtils
|
|||||||
qInstance.addProcess(defineProcessAddToPeoplesAge());
|
qInstance.addProcess(defineProcessAddToPeoplesAge());
|
||||||
qInstance.addProcess(new BasicETLProcess().defineProcessMetaData());
|
qInstance.addProcess(new BasicETLProcess().defineProcessMetaData());
|
||||||
qInstance.addProcess(new StreamedETLProcess().defineProcessMetaData());
|
qInstance.addProcess(new StreamedETLProcess().defineProcessMetaData());
|
||||||
|
qInstance.addProcess(defineProcessIncreasePersonBirthdate());
|
||||||
|
|
||||||
qInstance.addAutomationProvider(definePollingAutomationProvider());
|
qInstance.addAutomationProvider(definePollingAutomationProvider());
|
||||||
|
|
||||||
@ -154,6 +160,72 @@ public class TestUtils
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private static QProcessMetaData defineProcessIncreasePersonBirthdate()
|
||||||
|
{
|
||||||
|
return new QProcessMetaData()
|
||||||
|
.withName(PROCESS_NAME_INCREASE_BIRTHDATE)
|
||||||
|
.withTableName(TABLE_NAME_PERSON_MEMORY)
|
||||||
|
|
||||||
|
.addStep(new QFrontendStepMetaData()
|
||||||
|
.withName("preview")
|
||||||
|
)
|
||||||
|
|
||||||
|
.addStep(new QBackendStepMetaData()
|
||||||
|
.withName("doWork")
|
||||||
|
.withCode(new QCodeReference(IncreaseBirthdateStep.class))
|
||||||
|
.withInputData(new QFunctionInputMetaData()
|
||||||
|
.withRecordListMetaData(new QRecordListMetaData().withTableName(TABLE_NAME_PERSON_MEMORY)))
|
||||||
|
.withOutputMetaData(new QFunctionOutputMetaData()
|
||||||
|
.withFieldList(List.of(new QFieldMetaData("outputMessage", QFieldType.STRING).withDefaultValue("Success!"))))
|
||||||
|
)
|
||||||
|
|
||||||
|
.addStep(new QFrontendStepMetaData()
|
||||||
|
.withName("results")
|
||||||
|
.withFormField(new QFieldMetaData("outputMessage", QFieldType.STRING))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public static class IncreaseBirthdateStep implements BackendStep
|
||||||
|
{
|
||||||
|
/*******************************************************************************
|
||||||
|
** Execute the backend step - using the request as input, and the result as output.
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
|
{
|
||||||
|
List<QRecord> recordsToUpdate = new ArrayList<>();
|
||||||
|
for(QRecord record : runBackendStepInput.getRecords())
|
||||||
|
{
|
||||||
|
LocalDate birthDate = record.getValueLocalDate("birthDate");
|
||||||
|
|
||||||
|
if(birthDate != null && birthDate.getYear() < 1900)
|
||||||
|
{
|
||||||
|
recordsToUpdate.add(new QRecord()
|
||||||
|
.withValue("id", record.getValue("id"))
|
||||||
|
.withValue("birthDate", birthDate.withYear(1900))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
UpdateInput updateInput = new UpdateInput(runBackendStepInput.getInstance());
|
||||||
|
updateInput.setSession(runBackendStepInput.getSession());
|
||||||
|
updateInput.setTableName(TABLE_NAME_PERSON_MEMORY);
|
||||||
|
updateInput.setRecords(recordsToUpdate);
|
||||||
|
new UpdateAction().execute(updateInput);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
@ -375,6 +447,16 @@ public class TestUtils
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public static QTableMetaData definePersonMemoryTable()
|
public static QTableMetaData definePersonMemoryTable()
|
||||||
{
|
{
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
// the checkAge automation will only run on persons younger than this date //
|
||||||
|
/////////////////////////////////////////////////////////////////////////////
|
||||||
|
LocalDate youngPersonLimitDate = LocalDate.now().minusYears(18);
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// the increaseBirthdate automation will only run on persons born before this date //
|
||||||
|
/////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
LocalDate increaseBirthdateLimitDate = LocalDate.of(1900, Month.JANUARY, 1);
|
||||||
|
|
||||||
return (new QTableMetaData()
|
return (new QTableMetaData()
|
||||||
.withName(TABLE_NAME_PERSON_MEMORY)
|
.withName(TABLE_NAME_PERSON_MEMORY)
|
||||||
.withBackendName(MEMORY_BACKEND_NAME)
|
.withBackendName(MEMORY_BACKEND_NAME)
|
||||||
@ -386,12 +468,19 @@ public class TestUtils
|
|||||||
.withAction(new TableAutomationAction()
|
.withAction(new TableAutomationAction()
|
||||||
.withName("checkAgeOnInsert")
|
.withName("checkAgeOnInsert")
|
||||||
.withTriggerEvent(TriggerEvent.POST_INSERT)
|
.withTriggerEvent(TriggerEvent.POST_INSERT)
|
||||||
|
.withFilter(new QQueryFilter().withCriteria(new QFilterCriteria("birthDate", QCriteriaOperator.GREATER_THAN, List.of(youngPersonLimitDate))))
|
||||||
.withCodeReference(new QCodeReference(CheckAge.class))
|
.withCodeReference(new QCodeReference(CheckAge.class))
|
||||||
)
|
)
|
||||||
|
.withAction(new TableAutomationAction()
|
||||||
|
.withName("increaseBirthdate")
|
||||||
|
.withTriggerEvent(TriggerEvent.POST_INSERT)
|
||||||
|
.withFilter(new QQueryFilter().withCriteria(new QFilterCriteria("birthDate", QCriteriaOperator.LESS_THAN, List.of(increaseBirthdateLimitDate))))
|
||||||
|
.withProcessName(PROCESS_NAME_INCREASE_BIRTHDATE)
|
||||||
|
)
|
||||||
.withAction(new TableAutomationAction()
|
.withAction(new TableAutomationAction()
|
||||||
.withName("logOnUpdatePerFilter")
|
.withName("logOnUpdatePerFilter")
|
||||||
.withTriggerEvent(TriggerEvent.POST_UPDATE)
|
.withTriggerEvent(TriggerEvent.POST_UPDATE)
|
||||||
.withFilter(new QQueryFilter().withCriteria(new QFilterCriteria("firstName", QCriteriaOperator.EQUALS, List.of("Darin"))))
|
.withFilter(new QQueryFilter().withCriteria(new QFilterCriteria("firstName", QCriteriaOperator.CONTAINS, List.of("Darin"))))
|
||||||
.withCodeReference(new QCodeReference(LogPersonUpdate.class))
|
.withCodeReference(new QCodeReference(LogPersonUpdate.class))
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@ -420,7 +509,6 @@ public class TestUtils
|
|||||||
LocalDate birthDate = record.getValueLocalDate("birthDate");
|
LocalDate birthDate = record.getValueLocalDate("birthDate");
|
||||||
if(birthDate != null && birthDate.isAfter(limitDate))
|
if(birthDate != null && birthDate.isAfter(limitDate))
|
||||||
{
|
{
|
||||||
LOG.info("Person [" + record.getValueInteger("id") + "] is a minor - updating their firstName to state such.");
|
|
||||||
recordsToUpdate.add(new QRecord()
|
recordsToUpdate.add(new QRecord()
|
||||||
.withValue("id", record.getValue("id"))
|
.withValue("id", record.getValue("id"))
|
||||||
.withValue("firstName", record.getValueString("firstName") + SUFFIX_FOR_MINORS)
|
.withValue("firstName", record.getValueString("firstName") + SUFFIX_FOR_MINORS)
|
||||||
|
Reference in New Issue
Block a user