diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/automation/polling/PollingAutomationPerTableRunner.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/automation/polling/PollingAutomationPerTableRunner.java index 8d37403e..5a82d704 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/automation/polling/PollingAutomationPerTableRunner.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/automation/polling/PollingAutomationPerTableRunner.java @@ -43,6 +43,7 @@ import com.kingsrook.qqq.backend.core.actions.tables.GetAction; import com.kingsrook.qqq.backend.core.actions.tables.QueryAction; import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.logging.LogPair; import com.kingsrook.qqq.backend.core.logging.QLogger; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput; @@ -87,8 +88,9 @@ public class PollingAutomationPerTableRunner implements Runnable { private static final QLogger LOG = QLogger.getLogger(PollingAutomationPerTableRunner.class); - private final TableActions tableActions; - private final String name; + private final TableActionsInterface tableActions; + + private String name; private QInstance instance; private Supplier sessionSupplier; @@ -116,10 +118,41 @@ public class PollingAutomationPerTableRunner implements Runnable /******************************************************************************* - ** + ** Interface to be used by 2 records in this class - normal TableActions, and + ** ShardedTableActions. *******************************************************************************/ - public record TableActions(String tableName, AutomationStatus status) + public interface TableActionsInterface { + /******************************************************************************* + ** + *******************************************************************************/ + String tableName(); + + /******************************************************************************* + ** + *******************************************************************************/ + AutomationStatus status(); + } + + + + /******************************************************************************* + ** Wrapper for a pair of (tableName, automationStatus) + *******************************************************************************/ + public record TableActions(String tableName, AutomationStatus status) implements TableActionsInterface + { + + } + + + + /******************************************************************************* + ** extended version of TableAction, for sharding use-case - adds the shard + ** details. + *******************************************************************************/ + public record ShardedTableActions(String tableName, AutomationStatus status, String shardByFieldName, Serializable shardValue, String shardLabel) implements TableActionsInterface + { + } @@ -128,16 +161,46 @@ public class PollingAutomationPerTableRunner implements Runnable ** basically just get a list of tables which at least *could* have automations ** run - either meta-data automations, or table-triggers (data/user defined). *******************************************************************************/ - public static List getTableActions(QInstance instance, String providerName) + public static List getTableActions(QInstance instance, String providerName) { - List tableActionList = new ArrayList<>(); + List tableActionList = new ArrayList<>(); for(QTableMetaData table : instance.getTables().values()) { - if(table.getAutomationDetails() != null && providerName.equals(table.getAutomationDetails().getProviderName())) + QTableAutomationDetails automationDetails = table.getAutomationDetails(); + if(automationDetails != null && providerName.equals(automationDetails.getProviderName())) { - tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_INSERT_AUTOMATIONS)); - tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS)); + if(StringUtils.hasContent(automationDetails.getShardByFieldName())) + { + ////////////////////////////////////////////////////////////////////////////////////////////// + // for sharded automations, add a tableAction (of the sharded subtype) for each shard-value // + ////////////////////////////////////////////////////////////////////////////////////////////// + try + { + QueryInput queryInput = new QueryInput(); + queryInput.setTableName(automationDetails.getShardSourceTableName()); + QueryOutput queryOutput = new QueryAction().execute(queryInput); + for(QRecord record : queryOutput.getRecords()) + { + Serializable shardId = record.getValue(automationDetails.getShardIdFieldName()); + String label = record.getValueString(automationDetails.getShardLabelFieldName()); + tableActionList.add(new ShardedTableActions(table.getName(), AutomationStatus.PENDING_INSERT_AUTOMATIONS, automationDetails.getShardByFieldName(), shardId, label)); + tableActionList.add(new ShardedTableActions(table.getName(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS, automationDetails.getShardByFieldName(), shardId, label)); + } + } + catch(Exception e) + { + LOG.error("Error getting sharded table automation actions for a table", e, new LogPair("tableName", table.getName())); + } + } + else + { + /////////////////////////////////////////////////////////////////// + // for non-sharded, we just need tabler name & automation status // + /////////////////////////////////////////////////////////////////// + tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_INSERT_AUTOMATIONS)); + tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS)); + } } } @@ -149,12 +212,17 @@ public class PollingAutomationPerTableRunner implements Runnable /******************************************************************************* ** *******************************************************************************/ - public PollingAutomationPerTableRunner(QInstance instance, String providerName, Supplier sessionSupplier, TableActions tableActions) + public PollingAutomationPerTableRunner(QInstance instance, String providerName, Supplier sessionSupplier, TableActionsInterface tableActions) { this.instance = instance; this.sessionSupplier = sessionSupplier; this.tableActions = tableActions; this.name = providerName + ">" + tableActions.tableName() + ">" + tableActions.status().getInsertOrUpdate(); + + if(tableActions instanceof ShardedTableActions shardedTableActions) + { + this.name += ":" + shardedTableActions.shardLabel(); + } } @@ -229,6 +297,15 @@ public class PollingAutomationPerTableRunner implements Runnable throw (new NotImplementedException("Automation Status Tracking type [" + statusTrackingType + "] is not yet implemented in here.")); } + if(tableActions instanceof ShardedTableActions shardedTableActions) + { + ////////////////////////////////////////////////////////////// + // for sharded actions, add the shardBy field as a criteria // + ////////////////////////////////////////////////////////////// + QQueryFilter filter = queryInput.getFilter(); + filter.addCriteria(new QFilterCriteria(shardedTableActions.shardByFieldName(), QCriteriaOperator.EQUALS, shardedTableActions.shardValue())); + } + queryInput.setRecordPipe(recordPipe); return (new QueryAction().execute(queryInput)); }, () -> @@ -258,7 +335,23 @@ public class PollingAutomationPerTableRunner implements Runnable { if(action.getTriggerEvent().equals(triggerEvent)) { - rs.add(action); + /////////////////////////////////////////////////////////// + // for sharded configs, only run if the shard id matches // + /////////////////////////////////////////////////////////// + if(tableActions instanceof ShardedTableActions shardedTableActions) + { + if(shardedTableActions.shardValue().equals(action.getShardId())) + { + rs.add(action); + } + } + else + { + //////////////////////////////////////////// + // for non-sharded, always add the action // + //////////////////////////////////////////// + rs.add(action); + } } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/tables/automation/QTableAutomationDetails.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/tables/automation/QTableAutomationDetails.java index b075d0a9..303e2932 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/tables/automation/QTableAutomationDetails.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/tables/automation/QTableAutomationDetails.java @@ -37,6 +37,11 @@ public class QTableAutomationDetails private Integer overrideBatchSize; + private String shardByFieldName; // field in "this" table, to use for sharding + private String shardSourceTableName; // name of the table where the shards are defined as rows + private String shardLabelFieldName; // field in shard-source-table to use for labeling shards + private String shardIdFieldName; // field in shard-source-table to identify shards (e.g., joins to this table's shardByFieldName) + /******************************************************************************* @@ -188,4 +193,128 @@ public class QTableAutomationDetails return (this); } + + + /******************************************************************************* + ** Getter for shardByFieldName + *******************************************************************************/ + public String getShardByFieldName() + { + return (this.shardByFieldName); + } + + + + /******************************************************************************* + ** Setter for shardByFieldName + *******************************************************************************/ + public void setShardByFieldName(String shardByFieldName) + { + this.shardByFieldName = shardByFieldName; + } + + + + /******************************************************************************* + ** Fluent setter for shardByFieldName + *******************************************************************************/ + public QTableAutomationDetails withShardByFieldName(String shardByFieldName) + { + this.shardByFieldName = shardByFieldName; + return (this); + } + + + + /******************************************************************************* + ** Getter for shardSourceTableName + *******************************************************************************/ + public String getShardSourceTableName() + { + return (this.shardSourceTableName); + } + + + + /******************************************************************************* + ** Setter for shardSourceTableName + *******************************************************************************/ + public void setShardSourceTableName(String shardSourceTableName) + { + this.shardSourceTableName = shardSourceTableName; + } + + + + /******************************************************************************* + ** Fluent setter for shardSourceTableName + *******************************************************************************/ + public QTableAutomationDetails withShardSourceTableName(String shardSourceTableName) + { + this.shardSourceTableName = shardSourceTableName; + return (this); + } + + + + /******************************************************************************* + ** Getter for shardLabelFieldName + *******************************************************************************/ + public String getShardLabelFieldName() + { + return (this.shardLabelFieldName); + } + + + + /******************************************************************************* + ** Setter for shardLabelFieldName + *******************************************************************************/ + public void setShardLabelFieldName(String shardLabelFieldName) + { + this.shardLabelFieldName = shardLabelFieldName; + } + + + + /******************************************************************************* + ** Fluent setter for shardLabelFieldName + *******************************************************************************/ + public QTableAutomationDetails withShardLabelFieldName(String shardLabelFieldName) + { + this.shardLabelFieldName = shardLabelFieldName; + return (this); + } + + + + /******************************************************************************* + ** Getter for shardIdFieldName + *******************************************************************************/ + public String getShardIdFieldName() + { + return (this.shardIdFieldName); + } + + + + /******************************************************************************* + ** Setter for shardIdFieldName + *******************************************************************************/ + public void setShardIdFieldName(String shardIdFieldName) + { + this.shardIdFieldName = shardIdFieldName; + } + + + + /******************************************************************************* + ** Fluent setter for shardIdFieldName + *******************************************************************************/ + public QTableAutomationDetails withShardIdFieldName(String shardIdFieldName) + { + this.shardIdFieldName = shardIdFieldName; + return (this); + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/tables/automation/TableAutomationAction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/tables/automation/TableAutomationAction.java index aa043a8b..b089661c 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/tables/automation/TableAutomationAction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/tables/automation/TableAutomationAction.java @@ -37,6 +37,7 @@ public class TableAutomationAction private TriggerEvent triggerEvent; private Integer priority = 500; private QQueryFilter filter; + private Serializable shardId; //////////////////////////////////////////////////////////////////////// // flag that will cause the records to cause their associations to be // @@ -329,4 +330,35 @@ public class TableAutomationAction return (this); } + + + /******************************************************************************* + ** Getter for shardId + *******************************************************************************/ + public Serializable getShardId() + { + return (this.shardId); + } + + + + /******************************************************************************* + ** Setter for shardId + *******************************************************************************/ + public void setShardId(Serializable shardId) + { + this.shardId = shardId; + } + + + + /******************************************************************************* + ** Fluent setter for shardId + *******************************************************************************/ + public TableAutomationAction withShardId(Serializable shardId) + { + this.shardId = shardId; + return (this); + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/ScheduleManager.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/ScheduleManager.java index ff898089..375aebb9 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/ScheduleManager.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/ScheduleManager.java @@ -120,52 +120,68 @@ public class ScheduleManager return; } - for(QQueueProviderMetaData queueProvider : qInstance.getQueueProviders().values()) + boolean needToClearContext = false; + try { - startQueueProvider(queueProvider); - } - - for(QAutomationProviderMetaData automationProvider : qInstance.getAutomationProviders().values()) - { - startAutomationProviderPerTable(automationProvider); - } - - for(QProcessMetaData process : qInstance.getProcesses().values()) - { - if(process.getSchedule() != null && allowedToStart(process.getName())) + if(QContext.getQInstance() == null) { - QScheduleMetaData scheduleMetaData = process.getSchedule(); - if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy())) + needToClearContext = true; + QContext.init(qInstance, sessionSupplier.get()); + } + + for(QQueueProviderMetaData queueProvider : qInstance.getQueueProviders().values()) + { + startQueueProvider(queueProvider); + } + + for(QAutomationProviderMetaData automationProvider : qInstance.getAutomationProviders().values()) + { + startAutomationProviderPerTable(automationProvider); + } + + for(QProcessMetaData process : qInstance.getProcesses().values()) + { + if(process.getSchedule() != null && allowedToStart(process.getName())) { - /////////////////////////////////////////////// - // if no variants, or variant is serial mode // - /////////////////////////////////////////////// - startProcess(process, null); - } - else if(QScheduleMetaData.RunStrategy.PARALLEL.equals(process.getSchedule().getVariantRunStrategy())) - { - ///////////////////////////////////////////////////////////////////////////////////////////////////// - // if this a "parallel", which for example means we want to have a thread for each backend variant // - // running at the same time, get the variant records and schedule each separately // - ///////////////////////////////////////////////////////////////////////////////////////////////////// - QContext.init(qInstance, sessionSupplier.get()); - QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend()); - for(QRecord qRecord : CollectionUtils.nonNullList(getBackendVariantFilteredRecords(process))) + QScheduleMetaData scheduleMetaData = process.getSchedule(); + if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy())) { - try + /////////////////////////////////////////////// + // if no variants, or variant is serial mode // + /////////////////////////////////////////////// + startProcess(process, null); + } + else if(QScheduleMetaData.RunStrategy.PARALLEL.equals(process.getSchedule().getVariantRunStrategy())) + { + ///////////////////////////////////////////////////////////////////////////////////////////////////// + // if this a "parallel", which for example means we want to have a thread for each backend variant // + // running at the same time, get the variant records and schedule each separately // + ///////////////////////////////////////////////////////////////////////////////////////////////////// + QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend()); + for(QRecord qRecord : CollectionUtils.nonNullList(getBackendVariantFilteredRecords(process))) { - startProcess(process, MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField()))); - } - catch(Exception e) - { - LOG.error("An error starting process [" + process.getLabel() + "], with backend variant data.", e, new LogPair("variantQRecord", qRecord)); + try + { + startProcess(process, MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField()))); + } + catch(Exception e) + { + LOG.error("An error starting process [" + process.getLabel() + "], with backend variant data.", e, new LogPair("variantQRecord", qRecord)); + } } } + else + { + LOG.error("Unsupported Schedule Run Strategy [" + process.getSchedule().getVariantRunStrategy() + "] was provided."); + } } - else - { - LOG.error("Unsupported Schedule Run Strategy [" + process.getSchedule().getVariantRunStrategy() + "] was provided."); - } + } + } + finally + { + if(needToClearContext) + { + QContext.clear(); } } } @@ -210,8 +226,8 @@ public class ScheduleManager // ask the PollingAutomationPerTableRunner how many threads of itself need setup // // then start a scheduled executor foreach one // /////////////////////////////////////////////////////////////////////////////////// - List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName()); - for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions) + List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName()); + for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions) { if(allowedToStart(tableAction.tableName())) { diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/automation/polling/PollingAutomationPerTableRunnerTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/automation/polling/PollingAutomationPerTableRunnerTest.java index d4dcb1d5..a6d4f647 100644 --- a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/automation/polling/PollingAutomationPerTableRunnerTest.java +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/automation/polling/PollingAutomationPerTableRunnerTest.java @@ -202,8 +202,8 @@ class PollingAutomationPerTableRunnerTest extends BaseTest *******************************************************************************/ private void runAllTableActions(QInstance qInstance) throws QException { - List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); - for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions) + List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); + for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions) { PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunner(qInstance, TestUtils.POLLING_AUTOMATION, QSession::new, tableAction); @@ -504,8 +504,8 @@ class PollingAutomationPerTableRunnerTest extends BaseTest ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// assertThatThrownBy(() -> { - List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); - for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions) + List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); + for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions) { PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunnerThatShouldSimulateServerShutdownMidRun(qInstance, TestUtils.POLLING_AUTOMATION, QSession::new, tableAction); @@ -564,7 +564,7 @@ class PollingAutomationPerTableRunnerTest extends BaseTest /******************************************************************************* ** *******************************************************************************/ - public PollingAutomationPerTableRunnerThatShouldSimulateServerShutdownMidRun(QInstance instance, String providerName, Supplier sessionSupplier, TableActions tableActions) + public PollingAutomationPerTableRunnerThatShouldSimulateServerShutdownMidRun(QInstance instance, String providerName, Supplier sessionSupplier, TableActionsInterface tableActions) { super(instance, providerName, sessionSupplier, tableActions); } diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/automation/polling/StandardScheduledExecutorTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/automation/polling/StandardScheduledExecutorTest.java index 7f5681ab..57e0055d 100644 --- a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/automation/polling/StandardScheduledExecutorTest.java +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/automation/polling/StandardScheduledExecutorTest.java @@ -186,9 +186,9 @@ class StandardScheduledExecutorTest extends BaseTest *******************************************************************************/ private void runPollingAutomationExecutorForAwhile(QInstance qInstance, Supplier sessionSupplier) { - List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); + List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); List executors = new ArrayList<>(); - for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions) + for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions) { PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunner(qInstance, TestUtils.POLLING_AUTOMATION, sessionSupplier, tableAction); StandardScheduledExecutor pollingAutomationExecutor = new StandardScheduledExecutor(pollingAutomationPerTableRunner);