Fix where automation status got set to OK instead of running; switch to do an automation polling thread per-table/status

This commit is contained in:
2022-10-20 10:47:02 -05:00
parent 8b3b300eb1
commit 20c42deae5
6 changed files with 162 additions and 106 deletions

View File

@ -96,4 +96,20 @@ public enum AutomationStatus implements PossibleValueEnum<Integer>
{
return (getLabel());
}
/*******************************************************************************
**
*******************************************************************************/
@SuppressWarnings("checkstyle:indentation")
public String getInsertOrUpdate()
{
return switch(this)
{
case PENDING_INSERT_AUTOMATIONS, RUNNING_INSERT_AUTOMATIONS, FAILED_INSERT_AUTOMATIONS -> "Insert";
case PENDING_UPDATE_AUTOMATIONS, RUNNING_UPDATE_AUTOMATIONS, FAILED_UPDATE_AUTOMATIONS -> "Update";
case OK -> "";
};
}
}

View File

@ -62,11 +62,6 @@ public class RecordAutomationStatusUpdater
return (false);
}
if(canWeSkipPendingAndGoToOkay(table, automationStatus))
{
automationStatus = AutomationStatus.OK;
}
///////////////////////////////////////////////////////////////////////////////////////////////////
// In case an automation is running, and it updates records - don't let those records be marked //
// as PENDING_UPDATE_AUTOMATIONS... this is meant to avoid having a record's automation update //
@ -88,6 +83,11 @@ public class RecordAutomationStatusUpdater
}
}
if(canWeSkipPendingAndGoToOkay(table, automationStatus))
{
automationStatus = AutomationStatus.OK;
}
QTableAutomationDetails automationDetails = table.getAutomationDetails();
if(automationDetails.getStatusTracking() != null && AutomationStatusTrackingType.FIELD_IN_TABLE.equals(automationDetails.getStatusTracking().getType()))
{

View File

@ -24,7 +24,6 @@ package com.kingsrook.qqq.backend.core.actions.automation.polling;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@ -66,22 +65,22 @@ import org.apache.logging.log4j.Logger;
/*******************************************************************************
** Runnable for the Polling Automation Provider, that looks for records that
** need automations, and executes them.
**
** An instance of this class should be created for each table/automation-status
** - see the TableActions inner record for that definition, and the static
** getTableActions method that helps someone who wants to start these threads
** figure out which ones are needed.
*******************************************************************************/
public class PollingAutomationRunner implements Runnable
public class PollingAutomationPerTableRunner implements Runnable
{
private static final Logger LOG = LogManager.getLogger(PollingAutomationRunner.class);
private static final Logger LOG = LogManager.getLogger(PollingAutomationPerTableRunner.class);
private final TableActions tableActions;
private final String name;
private QInstance instance;
private String providerName;
private Supplier<QSession> sessionSupplier;
private List<QTableMetaData> managedTables = new ArrayList<>();
private Map<String, List<TableAutomationAction>> tableInsertActions = new HashMap<>();
private Map<String, List<TableAutomationAction>> tableUpdateActions = new HashMap<>();
private Map<String, Map<AutomationStatus, List<TableAutomationAction>>> tableActions = new HashMap<>();
private static Map<TriggerEvent, AutomationStatus> triggerEventAutomationStatusMap = Map.of(
TriggerEvent.POST_INSERT, AutomationStatus.PENDING_INSERT_AUTOMATIONS,
TriggerEvent.POST_UPDATE, AutomationStatus.PENDING_UPDATE_AUTOMATIONS
@ -102,21 +101,27 @@ public class PollingAutomationRunner implements Runnable
/*******************************************************************************
**
*******************************************************************************/
public PollingAutomationRunner(QInstance instance, String providerName, Supplier<QSession> sessionSupplier)
public record TableActions(String tableName, AutomationStatus status, List<TableAutomationAction> actions)
{
this.instance = instance;
this.providerName = providerName;
this.sessionSupplier = sessionSupplier;
}
/*******************************************************************************
**
*******************************************************************************/
public static List<TableActions> getTableActions(QInstance instance, String providerName)
{
Map<String, Map<AutomationStatus, List<TableAutomationAction>>> workingTableActionMap = new HashMap<>();
List<TableActions> tableActionList = new ArrayList<>();
//////////////////////////////////////////////////////////////////////
// todo - share logic like this among any automation implementation //
//////////////////////////////////////////////////////////////////////
for(QTableMetaData table : instance.getTables().values())
{
if(table.getAutomationDetails() != null && this.providerName.equals(table.getAutomationDetails().getProviderName()))
if(table.getAutomationDetails() != null && providerName.equals(table.getAutomationDetails().getProviderName()))
{
managedTables.add(table);
///////////////////////////////////////////////////////////////////////////
// organize the table's actions by type //
// todo - in future, need user-defined actions here too (and refreshed!) //
@ -124,25 +129,40 @@ public class PollingAutomationRunner implements Runnable
for(TableAutomationAction action : table.getAutomationDetails().getActions())
{
AutomationStatus automationStatus = triggerEventAutomationStatusMap.get(action.getTriggerEvent());
tableActions.putIfAbsent(table.getName(), new HashMap<>());
tableActions.get(table.getName()).putIfAbsent(automationStatus, new ArrayList<>());
tableActions.get(table.getName()).get(automationStatus).add(action);
workingTableActionMap.putIfAbsent(table.getName(), new HashMap<>());
workingTableActionMap.get(table.getName()).putIfAbsent(automationStatus, new ArrayList<>());
workingTableActionMap.get(table.getName()).get(automationStatus).add(action);
}
//////////////////////////////
// sort actions by priority //
//////////////////////////////
if(tableInsertActions.containsKey(table.getName()))
////////////////////////////////////////////
// convert the map to tableAction records //
////////////////////////////////////////////
for(Map.Entry<AutomationStatus, List<TableAutomationAction>> entry : workingTableActionMap.get(table.getName()).entrySet())
{
tableInsertActions.get(table.getName()).sort(Comparator.comparing(TableAutomationAction::getPriority));
}
AutomationStatus automationStatus = entry.getKey();
List<TableAutomationAction> actionList = entry.getValue();
if(tableUpdateActions.containsKey(table.getName()))
{
tableUpdateActions.get(table.getName()).sort(Comparator.comparing(TableAutomationAction::getPriority));
actionList.sort(Comparator.comparing(TableAutomationAction::getPriority));
tableActionList.add(new TableActions(table.getName(), automationStatus, actionList));
}
}
}
return (tableActionList);
}
/*******************************************************************************
**
*******************************************************************************/
public PollingAutomationPerTableRunner(QInstance instance, String providerName, Supplier<QSession> sessionSupplier, TableActions tableActions)
{
this.instance = instance;
this.sessionSupplier = sessionSupplier;
this.tableActions = tableActions;
this.name = providerName + ">" + tableActions.tableName() + ">" + tableActions.status().getInsertOrUpdate();
}
@ -153,32 +173,18 @@ public class PollingAutomationRunner implements Runnable
@Override
public void run()
{
Thread.currentThread().setName(getClass().getSimpleName() + ">" + providerName);
LOG.info("Running " + this.getClass().getSimpleName() + "[providerName=" + providerName + "]");
Thread.currentThread().setName(name);
LOG.info("Running " + this.getClass().getSimpleName() + "[" + name + "]");
for(QTableMetaData table : managedTables)
try
{
try
{
processTable(table);
}
catch(Exception e)
{
LOG.error("Error processing automations on table: " + table, e);
}
QSession session = sessionSupplier != null ? sessionSupplier.get() : new QSession();
processTableInsertOrUpdate(instance.getTable(tableActions.tableName()), session, tableActions.status(), tableActions.actions());
}
catch(Exception e)
{
LOG.warn("Error running automations", e);
}
}
/*******************************************************************************
** Query for and process records that have a PENDING status on a given table.
*******************************************************************************/
private void processTable(QTableMetaData table) throws QException
{
QSession session = sessionSupplier != null ? sessionSupplier.get() : new QSession();
processTableInsertOrUpdate(table, session, AutomationStatus.PENDING_INSERT_AUTOMATIONS);
processTableInsertOrUpdate(table, session, AutomationStatus.PENDING_UPDATE_AUTOMATIONS);
}
@ -186,11 +192,8 @@ public class PollingAutomationRunner implements Runnable
/*******************************************************************************
** Query for and process records that have a PENDING_INSERT or PENDING_UPDATE status on a given table.
*******************************************************************************/
private void processTableInsertOrUpdate(QTableMetaData table, QSession session, AutomationStatus automationStatus) throws QException
private void processTableInsertOrUpdate(QTableMetaData table, QSession session, AutomationStatus automationStatus, List<TableAutomationAction> actions) throws QException
{
List<TableAutomationAction> actions = tableActions
.getOrDefault(table.getName(), Collections.emptyMap())
.getOrDefault(automationStatus, Collections.emptyList());
if(CollectionUtils.nullSafeIsEmpty(actions))
{
return;
@ -387,4 +390,14 @@ public class PollingAutomationRunner implements Runnable
}
}
/*******************************************************************************
** Getter for name
**
*******************************************************************************/
public String getName()
{
return name;
}
}

View File

@ -26,7 +26,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationRunner;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
import com.kingsrook.qqq.backend.core.actions.queues.SQSQueuePoller;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
@ -114,7 +114,7 @@ public class ScheduleManager
for(QAutomationProviderMetaData automationProvider : qInstance.getAutomationProviders().values())
{
startAutomationProvider(automationProvider);
startAutomationProviderPerTable(automationProvider);
}
for(QProcessMetaData process : qInstance.getProcesses().values())
@ -131,18 +131,26 @@ public class ScheduleManager
/*******************************************************************************
**
*******************************************************************************/
private void startAutomationProvider(QAutomationProviderMetaData automationProvider)
private void startAutomationProviderPerTable(QAutomationProviderMetaData automationProvider)
{
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(qInstance, automationProvider.getName(), sessionSupplier);
StandardScheduledExecutor executor = new StandardScheduledExecutor(pollingAutomationRunner);
///////////////////////////////////////////////////////////////////////////////////
// ask the PollingAutomationPerTableRunner how many threads of itself need setup //
// then start a scheduled executor foreach one //
///////////////////////////////////////////////////////////////////////////////////
List<PollingAutomationPerTableRunner.TableActions> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName());
for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions)
{
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProvider.getName(), sessionSupplier, tableAction);
StandardScheduledExecutor executor = new StandardScheduledExecutor(runner);
QScheduleMetaData schedule = Objects.requireNonNullElseGet(automationProvider.getSchedule(), this::getDefaultSchedule);
QScheduleMetaData schedule = Objects.requireNonNullElseGet(automationProvider.getSchedule(), this::getDefaultSchedule);
executor.setName(automationProvider.getName());
setScheduleInExecutor(schedule, executor);
executor.start();
executor.setName(runner.getName());
setScheduleInExecutor(schedule, executor);
executor.start();
executors.add(executor);
executors.add(executor);
}
}

View File

@ -58,9 +58,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
/*******************************************************************************
** Unit test for PollingAutomationRunner
** Unit test for PollingAutomationPerTableRunner
*******************************************************************************/
class PollingAutomationRunnerTest
class PollingAutomationPerTableRunnerTest
{
/*******************************************************************************
@ -81,8 +81,7 @@ class PollingAutomationRunnerTest
@Test
void testInsertAndUpdate() throws QException
{
QInstance qInstance = TestUtils.defineInstance();
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(qInstance, TestUtils.POLLING_AUTOMATION, null);
QInstance qInstance = TestUtils.defineInstance();
/////////////////////////////////////////////////////////////////////////////////////////////////////////////
// insert 2 person records, both updated by the insert action, and 1 logged by logger-on-update automation //
@ -101,7 +100,7 @@ class PollingAutomationRunnerTest
// assert that the update-automation won't run - as no UPDATE has happened on the table //
// even though the insert action does update the records!! //
//////////////////////////////////////////////////////////////////////////////////////////
pollingAutomationRunner.run();
runAllTableActions(qInstance);
assertThat(TestUtils.LogPersonUpdate.updatedIds).isNullOrEmpty();
assertAllRecordsAutomationStatus(AutomationStatus.OK);
@ -114,7 +113,7 @@ class PollingAutomationRunnerTest
/////////////////////////////////////////////////////////////////////////////////////////
// run automations again - make sure that there haven't been any updates triggered yet //
/////////////////////////////////////////////////////////////////////////////////////////
pollingAutomationRunner.run();
runAllTableActions(qInstance);
assertThat(TestUtils.LogPersonUpdate.updatedIds).isNullOrEmpty();
assertAllRecordsAutomationStatus(AutomationStatus.OK);
@ -135,7 +134,7 @@ class PollingAutomationRunnerTest
// assert that the update-automation DOES run now - and that it only runs once //
// note that it will only run on a sub-set of the records //
/////////////////////////////////////////////////////////////////////////////////
pollingAutomationRunner.run();
runAllTableActions(qInstance);
assertThat(TestUtils.LogPersonUpdate.updatedIds)
.contains(2)
.hasSize(1);
@ -145,13 +144,27 @@ class PollingAutomationRunnerTest
// re-run and assert no further automations happen //
/////////////////////////////////////////////////////
TestUtils.LogPersonUpdate.updatedIds.clear();
pollingAutomationRunner.run();
runAllTableActions(qInstance);
assertThat(TestUtils.LogPersonUpdate.updatedIds).isNullOrEmpty();
assertAllRecordsAutomationStatus(AutomationStatus.OK);
}
/*******************************************************************************
**
*******************************************************************************/
private void runAllTableActions(QInstance qInstance)
{
List<PollingAutomationPerTableRunner.TableActions> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION);
for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions)
{
new PollingAutomationPerTableRunner(qInstance, TestUtils.POLLING_AUTOMATION, null, tableAction).run();
}
}
/*******************************************************************************
** Test a large-ish number - to demonstrate paging working.
**
@ -162,8 +175,7 @@ class PollingAutomationRunnerTest
@Test
void testMultiPages() throws QException
{
QInstance qInstance = TestUtils.defineInstance();
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(qInstance, TestUtils.POLLING_AUTOMATION, null);
QInstance qInstance = TestUtils.defineInstance();
//////////////////////////////////////////////////////////////////////////////////
// insert many people - half who should be updated by the AgeChecker automation //
@ -186,7 +198,7 @@ class PollingAutomationRunnerTest
/////////////////////////
// run the automations //
/////////////////////////
pollingAutomationRunner.run();
runAllTableActions(qInstance);
assertAllRecordsAutomationStatus(AutomationStatus.OK);
///////////////////////////////////////////////////////////////////////////
@ -213,8 +225,7 @@ class PollingAutomationRunnerTest
@Test
void testRunningProcess() throws QException
{
QInstance qInstance = TestUtils.defineInstance();
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(qInstance, TestUtils.POLLING_AUTOMATION, null);
QInstance qInstance = TestUtils.defineInstance();
////////////////////////////////////////////////////////////////////
// insert 2 person records, 1 to trigger the "increaseAge" action //
@ -228,7 +239,7 @@ class PollingAutomationRunnerTest
));
new InsertAction().execute(insertInput);
pollingAutomationRunner.run();
runAllTableActions(qInstance);
/////////////////////////////////////////////////////////////////////////////////////////////
// make sure the process ran - which means, it would have updated Tim's birth year to 1900 //
@ -276,19 +287,18 @@ class PollingAutomationRunnerTest
instance.getTable(TestUtils.TABLE_NAME_SHAPE)
.withField(new QFieldMetaData("automationStatus", QFieldType.INTEGER))
.setAutomationDetails(new QTableAutomationDetails()
.withProviderName(TestUtils.POLLING_AUTOMATION)
.withStatusTracking(new AutomationStatusTracking().withType(AutomationStatusTrackingType.FIELD_IN_TABLE).withFieldName("automationStatus"))
.withAction(new TableAutomationAction()
.withName("shapeToPerson")
.withTriggerEvent(TriggerEvent.POST_INSERT)
.withProcessName("shapeToPersonETLProcess")
)
);
.withProviderName(TestUtils.POLLING_AUTOMATION)
.withStatusTracking(new AutomationStatusTracking().withType(AutomationStatusTrackingType.FIELD_IN_TABLE).withFieldName("automationStatus"))
.withAction(new TableAutomationAction()
.withName("shapeToPerson")
.withTriggerEvent(TriggerEvent.POST_INSERT)
.withProcessName("shapeToPersonETLProcess")
)
);
TestUtils.insertDefaultShapes(instance);
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(instance, TestUtils.POLLING_AUTOMATION, null);
pollingAutomationRunner.run();
runAllTableActions(instance);
List<QRecord> postList = TestUtils.queryTable(instance, TestUtils.TABLE_NAME_PERSON);
assertThat(postList)

View File

@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.actions.automation.polling;
import java.time.LocalDate;
import java.time.Month;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
@ -127,7 +128,8 @@ class StandardScheduledExecutorTest
//////////////////////////////////////////////////////////////////////
qInstance.getTable(TestUtils.TABLE_NAME_PERSON_MEMORY)
.getAutomationDetails().getActions().get(0)
.setCodeReference(new QCodeReference(CaptureSessionIdAutomationHandler.class));
.withCodeReference(new QCodeReference(CaptureSessionIdAutomationHandler.class))
.withName("captureSessionId");
////////////////////////////////////////////////////////////
// insert a person that will trigger the on-insert action //
@ -184,17 +186,24 @@ class StandardScheduledExecutorTest
*******************************************************************************/
private void runPollingAutomationExecutorForAwhile(QInstance qInstance, Supplier<QSession> sessionSupplier)
{
PollingAutomationRunner pollingAutomationRunner = new PollingAutomationRunner(qInstance, TestUtils.POLLING_AUTOMATION, sessionSupplier);
List<PollingAutomationPerTableRunner.TableActions> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION);
List<StandardScheduledExecutor> executors = new ArrayList<>();
for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions)
{
PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunner(qInstance, TestUtils.POLLING_AUTOMATION, sessionSupplier, tableAction);
StandardScheduledExecutor pollingAutomationExecutor = new StandardScheduledExecutor(pollingAutomationPerTableRunner);
pollingAutomationExecutor.setInitialDelayMillis(0);
pollingAutomationExecutor.setDelayMillis(100);
pollingAutomationExecutor.setQInstance(qInstance);
pollingAutomationExecutor.setName(TestUtils.POLLING_AUTOMATION);
pollingAutomationExecutor.setSessionSupplier(sessionSupplier);
pollingAutomationExecutor.start();
executors.add(pollingAutomationExecutor);
}
StandardScheduledExecutor pollingAutomationExecutor = new StandardScheduledExecutor(pollingAutomationRunner);
pollingAutomationExecutor.setInitialDelayMillis(0);
pollingAutomationExecutor.setDelayMillis(100);
pollingAutomationExecutor.setQInstance(qInstance);
pollingAutomationExecutor.setName(TestUtils.POLLING_AUTOMATION);
pollingAutomationExecutor.setSessionSupplier(sessionSupplier);
pollingAutomationExecutor.start();
SleepUtils.sleep(1, TimeUnit.SECONDS);
pollingAutomationExecutor.stop();
executors.forEach(StandardScheduledExecutor::stop);
}
}