CE-781 Add concept of sharded automations - schedule multiple instances of job, filter implicitly by shard value

This commit is contained in:
2024-01-08 14:13:44 -06:00
parent 06259041f8
commit a5420bff4c
6 changed files with 328 additions and 58 deletions

View File

@ -43,6 +43,7 @@ import com.kingsrook.qqq.backend.core.actions.tables.GetAction;
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction; import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.LogPair;
import com.kingsrook.qqq.backend.core.logging.QLogger; import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput;
@ -87,8 +88,9 @@ public class PollingAutomationPerTableRunner implements Runnable
{ {
private static final QLogger LOG = QLogger.getLogger(PollingAutomationPerTableRunner.class); private static final QLogger LOG = QLogger.getLogger(PollingAutomationPerTableRunner.class);
private final TableActions tableActions; private final TableActionsInterface tableActions;
private final String name;
private String name;
private QInstance instance; private QInstance instance;
private Supplier<QSession> sessionSupplier; private Supplier<QSession> sessionSupplier;
@ -116,10 +118,41 @@ public class PollingAutomationPerTableRunner implements Runnable
/******************************************************************************* /*******************************************************************************
** ** Interface to be used by 2 records in this class - normal TableActions, and
** ShardedTableActions.
*******************************************************************************/ *******************************************************************************/
public record TableActions(String tableName, AutomationStatus status) public interface TableActionsInterface
{ {
/*******************************************************************************
**
*******************************************************************************/
String tableName();
/*******************************************************************************
**
*******************************************************************************/
AutomationStatus status();
}
/*******************************************************************************
** Wrapper for a pair of (tableName, automationStatus)
*******************************************************************************/
public record TableActions(String tableName, AutomationStatus status) implements TableActionsInterface
{
}
/*******************************************************************************
** extended version of TableAction, for sharding use-case - adds the shard
** details.
*******************************************************************************/
public record ShardedTableActions(String tableName, AutomationStatus status, String shardByFieldName, Serializable shardValue, String shardLabel) implements TableActionsInterface
{
} }
@ -128,16 +161,46 @@ public class PollingAutomationPerTableRunner implements Runnable
** basically just get a list of tables which at least *could* have automations ** basically just get a list of tables which at least *could* have automations
** run - either meta-data automations, or table-triggers (data/user defined). ** run - either meta-data automations, or table-triggers (data/user defined).
*******************************************************************************/ *******************************************************************************/
public static List<TableActions> getTableActions(QInstance instance, String providerName) public static List<TableActionsInterface> getTableActions(QInstance instance, String providerName)
{ {
List<TableActions> tableActionList = new ArrayList<>(); List<TableActionsInterface> tableActionList = new ArrayList<>();
for(QTableMetaData table : instance.getTables().values()) for(QTableMetaData table : instance.getTables().values())
{ {
if(table.getAutomationDetails() != null && providerName.equals(table.getAutomationDetails().getProviderName())) QTableAutomationDetails automationDetails = table.getAutomationDetails();
if(automationDetails != null && providerName.equals(automationDetails.getProviderName()))
{ {
tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_INSERT_AUTOMATIONS)); if(StringUtils.hasContent(automationDetails.getShardByFieldName()))
tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS)); {
//////////////////////////////////////////////////////////////////////////////////////////////
// for sharded automations, add a tableAction (of the sharded subtype) for each shard-value //
//////////////////////////////////////////////////////////////////////////////////////////////
try
{
QueryInput queryInput = new QueryInput();
queryInput.setTableName(automationDetails.getShardSourceTableName());
QueryOutput queryOutput = new QueryAction().execute(queryInput);
for(QRecord record : queryOutput.getRecords())
{
Serializable shardId = record.getValue(automationDetails.getShardIdFieldName());
String label = record.getValueString(automationDetails.getShardLabelFieldName());
tableActionList.add(new ShardedTableActions(table.getName(), AutomationStatus.PENDING_INSERT_AUTOMATIONS, automationDetails.getShardByFieldName(), shardId, label));
tableActionList.add(new ShardedTableActions(table.getName(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS, automationDetails.getShardByFieldName(), shardId, label));
}
}
catch(Exception e)
{
LOG.error("Error getting sharded table automation actions for a table", e, new LogPair("tableName", table.getName()));
}
}
else
{
///////////////////////////////////////////////////////////////////
// for non-sharded, we just need tabler name & automation status //
///////////////////////////////////////////////////////////////////
tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_INSERT_AUTOMATIONS));
tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS));
}
} }
} }
@ -149,12 +212,17 @@ public class PollingAutomationPerTableRunner implements Runnable
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
public PollingAutomationPerTableRunner(QInstance instance, String providerName, Supplier<QSession> sessionSupplier, TableActions tableActions) public PollingAutomationPerTableRunner(QInstance instance, String providerName, Supplier<QSession> sessionSupplier, TableActionsInterface tableActions)
{ {
this.instance = instance; this.instance = instance;
this.sessionSupplier = sessionSupplier; this.sessionSupplier = sessionSupplier;
this.tableActions = tableActions; this.tableActions = tableActions;
this.name = providerName + ">" + tableActions.tableName() + ">" + tableActions.status().getInsertOrUpdate(); this.name = providerName + ">" + tableActions.tableName() + ">" + tableActions.status().getInsertOrUpdate();
if(tableActions instanceof ShardedTableActions shardedTableActions)
{
this.name += ":" + shardedTableActions.shardLabel();
}
} }
@ -229,6 +297,15 @@ public class PollingAutomationPerTableRunner implements Runnable
throw (new NotImplementedException("Automation Status Tracking type [" + statusTrackingType + "] is not yet implemented in here.")); throw (new NotImplementedException("Automation Status Tracking type [" + statusTrackingType + "] is not yet implemented in here."));
} }
if(tableActions instanceof ShardedTableActions shardedTableActions)
{
//////////////////////////////////////////////////////////////
// for sharded actions, add the shardBy field as a criteria //
//////////////////////////////////////////////////////////////
QQueryFilter filter = queryInput.getFilter();
filter.addCriteria(new QFilterCriteria(shardedTableActions.shardByFieldName(), QCriteriaOperator.EQUALS, shardedTableActions.shardValue()));
}
queryInput.setRecordPipe(recordPipe); queryInput.setRecordPipe(recordPipe);
return (new QueryAction().execute(queryInput)); return (new QueryAction().execute(queryInput));
}, () -> }, () ->
@ -258,7 +335,23 @@ public class PollingAutomationPerTableRunner implements Runnable
{ {
if(action.getTriggerEvent().equals(triggerEvent)) if(action.getTriggerEvent().equals(triggerEvent))
{ {
rs.add(action); ///////////////////////////////////////////////////////////
// for sharded configs, only run if the shard id matches //
///////////////////////////////////////////////////////////
if(tableActions instanceof ShardedTableActions shardedTableActions)
{
if(shardedTableActions.shardValue().equals(action.getShardId()))
{
rs.add(action);
}
}
else
{
////////////////////////////////////////////
// for non-sharded, always add the action //
////////////////////////////////////////////
rs.add(action);
}
} }
} }

View File

@ -37,6 +37,11 @@ public class QTableAutomationDetails
private Integer overrideBatchSize; private Integer overrideBatchSize;
private String shardByFieldName; // field in "this" table, to use for sharding
private String shardSourceTableName; // name of the table where the shards are defined as rows
private String shardLabelFieldName; // field in shard-source-table to use for labeling shards
private String shardIdFieldName; // field in shard-source-table to identify shards (e.g., joins to this table's shardByFieldName)
/******************************************************************************* /*******************************************************************************
@ -188,4 +193,128 @@ public class QTableAutomationDetails
return (this); return (this);
} }
/*******************************************************************************
** Getter for shardByFieldName
*******************************************************************************/
public String getShardByFieldName()
{
return (this.shardByFieldName);
}
/*******************************************************************************
** Setter for shardByFieldName
*******************************************************************************/
public void setShardByFieldName(String shardByFieldName)
{
this.shardByFieldName = shardByFieldName;
}
/*******************************************************************************
** Fluent setter for shardByFieldName
*******************************************************************************/
public QTableAutomationDetails withShardByFieldName(String shardByFieldName)
{
this.shardByFieldName = shardByFieldName;
return (this);
}
/*******************************************************************************
** Getter for shardSourceTableName
*******************************************************************************/
public String getShardSourceTableName()
{
return (this.shardSourceTableName);
}
/*******************************************************************************
** Setter for shardSourceTableName
*******************************************************************************/
public void setShardSourceTableName(String shardSourceTableName)
{
this.shardSourceTableName = shardSourceTableName;
}
/*******************************************************************************
** Fluent setter for shardSourceTableName
*******************************************************************************/
public QTableAutomationDetails withShardSourceTableName(String shardSourceTableName)
{
this.shardSourceTableName = shardSourceTableName;
return (this);
}
/*******************************************************************************
** Getter for shardLabelFieldName
*******************************************************************************/
public String getShardLabelFieldName()
{
return (this.shardLabelFieldName);
}
/*******************************************************************************
** Setter for shardLabelFieldName
*******************************************************************************/
public void setShardLabelFieldName(String shardLabelFieldName)
{
this.shardLabelFieldName = shardLabelFieldName;
}
/*******************************************************************************
** Fluent setter for shardLabelFieldName
*******************************************************************************/
public QTableAutomationDetails withShardLabelFieldName(String shardLabelFieldName)
{
this.shardLabelFieldName = shardLabelFieldName;
return (this);
}
/*******************************************************************************
** Getter for shardIdFieldName
*******************************************************************************/
public String getShardIdFieldName()
{
return (this.shardIdFieldName);
}
/*******************************************************************************
** Setter for shardIdFieldName
*******************************************************************************/
public void setShardIdFieldName(String shardIdFieldName)
{
this.shardIdFieldName = shardIdFieldName;
}
/*******************************************************************************
** Fluent setter for shardIdFieldName
*******************************************************************************/
public QTableAutomationDetails withShardIdFieldName(String shardIdFieldName)
{
this.shardIdFieldName = shardIdFieldName;
return (this);
}
} }

View File

@ -37,6 +37,7 @@ public class TableAutomationAction
private TriggerEvent triggerEvent; private TriggerEvent triggerEvent;
private Integer priority = 500; private Integer priority = 500;
private QQueryFilter filter; private QQueryFilter filter;
private Serializable shardId;
//////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////
// flag that will cause the records to cause their associations to be // // flag that will cause the records to cause their associations to be //
@ -329,4 +330,35 @@ public class TableAutomationAction
return (this); return (this);
} }
/*******************************************************************************
** Getter for shardId
*******************************************************************************/
public Serializable getShardId()
{
return (this.shardId);
}
/*******************************************************************************
** Setter for shardId
*******************************************************************************/
public void setShardId(Serializable shardId)
{
this.shardId = shardId;
}
/*******************************************************************************
** Fluent setter for shardId
*******************************************************************************/
public TableAutomationAction withShardId(Serializable shardId)
{
this.shardId = shardId;
return (this);
}
} }

View File

@ -120,52 +120,68 @@ public class ScheduleManager
return; return;
} }
for(QQueueProviderMetaData queueProvider : qInstance.getQueueProviders().values()) boolean needToClearContext = false;
try
{ {
startQueueProvider(queueProvider); if(QContext.getQInstance() == null)
}
for(QAutomationProviderMetaData automationProvider : qInstance.getAutomationProviders().values())
{
startAutomationProviderPerTable(automationProvider);
}
for(QProcessMetaData process : qInstance.getProcesses().values())
{
if(process.getSchedule() != null && allowedToStart(process.getName()))
{ {
QScheduleMetaData scheduleMetaData = process.getSchedule(); needToClearContext = true;
if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy())) QContext.init(qInstance, sessionSupplier.get());
}
for(QQueueProviderMetaData queueProvider : qInstance.getQueueProviders().values())
{
startQueueProvider(queueProvider);
}
for(QAutomationProviderMetaData automationProvider : qInstance.getAutomationProviders().values())
{
startAutomationProviderPerTable(automationProvider);
}
for(QProcessMetaData process : qInstance.getProcesses().values())
{
if(process.getSchedule() != null && allowedToStart(process.getName()))
{ {
/////////////////////////////////////////////// QScheduleMetaData scheduleMetaData = process.getSchedule();
// if no variants, or variant is serial mode // if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy()))
///////////////////////////////////////////////
startProcess(process, null);
}
else if(QScheduleMetaData.RunStrategy.PARALLEL.equals(process.getSchedule().getVariantRunStrategy()))
{
/////////////////////////////////////////////////////////////////////////////////////////////////////
// if this a "parallel", which for example means we want to have a thread for each backend variant //
// running at the same time, get the variant records and schedule each separately //
/////////////////////////////////////////////////////////////////////////////////////////////////////
QContext.init(qInstance, sessionSupplier.get());
QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend());
for(QRecord qRecord : CollectionUtils.nonNullList(getBackendVariantFilteredRecords(process)))
{ {
try ///////////////////////////////////////////////
// if no variants, or variant is serial mode //
///////////////////////////////////////////////
startProcess(process, null);
}
else if(QScheduleMetaData.RunStrategy.PARALLEL.equals(process.getSchedule().getVariantRunStrategy()))
{
/////////////////////////////////////////////////////////////////////////////////////////////////////
// if this a "parallel", which for example means we want to have a thread for each backend variant //
// running at the same time, get the variant records and schedule each separately //
/////////////////////////////////////////////////////////////////////////////////////////////////////
QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend());
for(QRecord qRecord : CollectionUtils.nonNullList(getBackendVariantFilteredRecords(process)))
{ {
startProcess(process, MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField()))); try
} {
catch(Exception e) startProcess(process, MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField())));
{ }
LOG.error("An error starting process [" + process.getLabel() + "], with backend variant data.", e, new LogPair("variantQRecord", qRecord)); catch(Exception e)
{
LOG.error("An error starting process [" + process.getLabel() + "], with backend variant data.", e, new LogPair("variantQRecord", qRecord));
}
} }
} }
else
{
LOG.error("Unsupported Schedule Run Strategy [" + process.getSchedule().getVariantRunStrategy() + "] was provided.");
}
} }
else }
{ }
LOG.error("Unsupported Schedule Run Strategy [" + process.getSchedule().getVariantRunStrategy() + "] was provided."); finally
} {
if(needToClearContext)
{
QContext.clear();
} }
} }
} }
@ -210,8 +226,8 @@ public class ScheduleManager
// ask the PollingAutomationPerTableRunner how many threads of itself need setup // // ask the PollingAutomationPerTableRunner how many threads of itself need setup //
// then start a scheduled executor foreach one // // then start a scheduled executor foreach one //
/////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////
List<PollingAutomationPerTableRunner.TableActions> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName()); List<PollingAutomationPerTableRunner.TableActionsInterface> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName());
for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions) for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions)
{ {
if(allowedToStart(tableAction.tableName())) if(allowedToStart(tableAction.tableName()))
{ {

View File

@ -202,8 +202,8 @@ class PollingAutomationPerTableRunnerTest extends BaseTest
*******************************************************************************/ *******************************************************************************/
private void runAllTableActions(QInstance qInstance) throws QException private void runAllTableActions(QInstance qInstance) throws QException
{ {
List<PollingAutomationPerTableRunner.TableActions> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); List<PollingAutomationPerTableRunner.TableActionsInterface> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION);
for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions) for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions)
{ {
PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunner(qInstance, TestUtils.POLLING_AUTOMATION, QSession::new, tableAction); PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunner(qInstance, TestUtils.POLLING_AUTOMATION, QSession::new, tableAction);
@ -504,8 +504,8 @@ class PollingAutomationPerTableRunnerTest extends BaseTest
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
assertThatThrownBy(() -> assertThatThrownBy(() ->
{ {
List<PollingAutomationPerTableRunner.TableActions> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); List<PollingAutomationPerTableRunner.TableActionsInterface> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION);
for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions) for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions)
{ {
PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunnerThatShouldSimulateServerShutdownMidRun(qInstance, TestUtils.POLLING_AUTOMATION, QSession::new, tableAction); PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunnerThatShouldSimulateServerShutdownMidRun(qInstance, TestUtils.POLLING_AUTOMATION, QSession::new, tableAction);
@ -564,7 +564,7 @@ class PollingAutomationPerTableRunnerTest extends BaseTest
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
public PollingAutomationPerTableRunnerThatShouldSimulateServerShutdownMidRun(QInstance instance, String providerName, Supplier<QSession> sessionSupplier, TableActions tableActions) public PollingAutomationPerTableRunnerThatShouldSimulateServerShutdownMidRun(QInstance instance, String providerName, Supplier<QSession> sessionSupplier, TableActionsInterface tableActions)
{ {
super(instance, providerName, sessionSupplier, tableActions); super(instance, providerName, sessionSupplier, tableActions);
} }

View File

@ -186,9 +186,9 @@ class StandardScheduledExecutorTest extends BaseTest
*******************************************************************************/ *******************************************************************************/
private void runPollingAutomationExecutorForAwhile(QInstance qInstance, Supplier<QSession> sessionSupplier) private void runPollingAutomationExecutorForAwhile(QInstance qInstance, Supplier<QSession> sessionSupplier)
{ {
List<PollingAutomationPerTableRunner.TableActions> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION); List<PollingAutomationPerTableRunner.TableActionsInterface> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, TestUtils.POLLING_AUTOMATION);
List<StandardScheduledExecutor> executors = new ArrayList<>(); List<StandardScheduledExecutor> executors = new ArrayList<>();
for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions) for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions)
{ {
PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunner(qInstance, TestUtils.POLLING_AUTOMATION, sessionSupplier, tableAction); PollingAutomationPerTableRunner pollingAutomationPerTableRunner = new PollingAutomationPerTableRunner(qInstance, TestUtils.POLLING_AUTOMATION, sessionSupplier, tableAction);
StandardScheduledExecutor pollingAutomationExecutor = new StandardScheduledExecutor(pollingAutomationPerTableRunner); StandardScheduledExecutor pollingAutomationExecutor = new StandardScheduledExecutor(pollingAutomationPerTableRunner);