CE-936 - scheduling updates:

- move queues & automations to be scheduled (only) at the lower-level (per-queue, per-table) - not at the higher "provider" levels.
- update quartz to delete jobs which are no-longer active, at end of QScheduleManager's setup
-
This commit is contained in:
2024-03-13 12:08:13 -05:00
parent 246984892a
commit 3265d6d842
18 changed files with 385 additions and 235 deletions

View File

@ -132,6 +132,11 @@ public class PollingAutomationPerTableRunner implements Runnable
*******************************************************************************/
String tableName();
/*******************************************************************************
**
*******************************************************************************/
QTableAutomationDetails tableAutomationDetails();
/*******************************************************************************
**
*******************************************************************************/
@ -143,7 +148,7 @@ public class PollingAutomationPerTableRunner implements Runnable
/*******************************************************************************
** Wrapper for a pair of (tableName, automationStatus)
*******************************************************************************/
public record TableActions(String tableName, AutomationStatus status) implements TableActionsInterface
public record TableActions(String tableName, QTableAutomationDetails tableAutomationDetails, AutomationStatus status) implements TableActionsInterface
{
/*******************************************************************************
**
@ -159,7 +164,7 @@ public class PollingAutomationPerTableRunner implements Runnable
** extended version of TableAction, for sharding use-case - adds the shard
** details.
*******************************************************************************/
public record ShardedTableActions(String tableName, AutomationStatus status, String shardByFieldName, Serializable shardValue, String shardLabel) implements TableActionsInterface
public record ShardedTableActions(String tableName, QTableAutomationDetails tableAutomationDetails, AutomationStatus status, String shardByFieldName, Serializable shardValue, String shardLabel) implements TableActionsInterface
{
/*******************************************************************************
**
@ -198,8 +203,8 @@ public class PollingAutomationPerTableRunner implements Runnable
{
Serializable shardId = record.getValue(automationDetails.getShardIdFieldName());
String label = record.getValueString(automationDetails.getShardLabelFieldName());
tableActionList.add(new ShardedTableActions(table.getName(), AutomationStatus.PENDING_INSERT_AUTOMATIONS, automationDetails.getShardByFieldName(), shardId, label));
tableActionList.add(new ShardedTableActions(table.getName(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS, automationDetails.getShardByFieldName(), shardId, label));
tableActionList.add(new ShardedTableActions(table.getName(), automationDetails, AutomationStatus.PENDING_INSERT_AUTOMATIONS, automationDetails.getShardByFieldName(), shardId, label));
tableActionList.add(new ShardedTableActions(table.getName(), automationDetails, AutomationStatus.PENDING_UPDATE_AUTOMATIONS, automationDetails.getShardByFieldName(), shardId, label));
}
}
catch(Exception e)
@ -209,11 +214,11 @@ public class PollingAutomationPerTableRunner implements Runnable
}
else
{
///////////////////////////////////////////////////////////////////
// for non-sharded, we just need tabler name & automation status //
///////////////////////////////////////////////////////////////////
tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_INSERT_AUTOMATIONS));
tableActionList.add(new TableActions(table.getName(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS));
//////////////////////////////////////////////////////////////////
// for non-sharded, we just need table name & automation status //
//////////////////////////////////////////////////////////////////
tableActionList.add(new TableActions(table.getName(), automationDetails, AutomationStatus.PENDING_INSERT_AUTOMATIONS));
tableActionList.add(new TableActions(table.getName(), automationDetails, AutomationStatus.PENDING_UPDATE_AUTOMATIONS));
}
}
}

View File

@ -417,11 +417,6 @@ public class QInstanceValidator
assertCondition(StringUtils.hasContent(sqsQueueProvider.getSecretKey()), "Missing secretKey for SQSQueueProvider: " + name);
assertCondition(StringUtils.hasContent(sqsQueueProvider.getBaseURL()), "Missing baseURL for SQSQueueProvider: " + name);
assertCondition(StringUtils.hasContent(sqsQueueProvider.getRegion()), "Missing region for SQSQueueProvider: " + name);
if(assertCondition(sqsQueueProvider.getSchedule() != null, "Missing schedule for SQSQueueProvider: " + name))
{
validateScheduleMetaData(sqsQueueProvider.getSchedule(), qInstance, "SQSQueueProvider " + name + ", schedule: ");
}
}
runPlugins(QQueueProviderMetaData.class, queueProvider, qInstance);
@ -440,6 +435,14 @@ public class QInstanceValidator
assertCondition(qInstance.getProcesses() != null && qInstance.getProcess(queue.getProcessName()) != null, "Unrecognized processName for queue: " + name);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo - if we have, in the future, a provider that doesn't require schedules per-queue, then make this check conditional //
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
if(assertCondition(queue.getSchedule() != null, "Missing schedule for SQSQueueProvider: " + name))
{
validateScheduleMetaData(queue.getSchedule(), qInstance, "SQSQueueProvider " + name + ", schedule: ");
}
runPlugins(QQueueMetaData.class, queue, qInstance);
});
}
@ -479,11 +482,6 @@ public class QInstanceValidator
assertCondition(Objects.equals(name, automationProvider.getName()), "Inconsistent naming for automationProvider: " + name + "/" + automationProvider.getName() + ".");
assertCondition(automationProvider.getType() != null, "Missing type for automationProvider: " + name);
if(assertCondition(automationProvider.getSchedule() != null, "Missing schedule for automationProvider: " + name))
{
validateScheduleMetaData(automationProvider.getSchedule(), qInstance, "automationProvider " + name + ", schedule: ");
}
runPlugins(QAutomationProviderMetaData.class, automationProvider, qInstance);
});
}
@ -1026,6 +1024,11 @@ public class QInstanceValidator
assertCondition(qInstance.getAutomationProvider(providerName) != null, " has an unrecognized providerName: " + providerName);
}
if(assertCondition(automationDetails.getSchedule() != null, prefix + "Missing schedule for automations"))
{
validateScheduleMetaData(automationDetails.getSchedule(), qInstance, prefix + " automationDetails, schedule: ");
}
//////////////////////////////////
// validate the status tracking //
//////////////////////////////////

View File

@ -24,7 +24,6 @@ package com.kingsrook.qqq.backend.core.model.metadata.automation;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.TopLevelMetaDataInterface;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
/*******************************************************************************
@ -35,8 +34,6 @@ public class QAutomationProviderMetaData implements TopLevelMetaDataInterface
private String name;
private QAutomationProviderType type;
private QScheduleMetaData schedule;
/*******************************************************************************
@ -107,40 +104,6 @@ public class QAutomationProviderMetaData implements TopLevelMetaDataInterface
/*******************************************************************************
** Getter for schedule
**
*******************************************************************************/
public QScheduleMetaData getSchedule()
{
return schedule;
}
/*******************************************************************************
** Setter for schedule
**
*******************************************************************************/
public void setSchedule(QScheduleMetaData schedule)
{
this.schedule = schedule;
}
/*******************************************************************************
** Fluent setter for schedule
**
*******************************************************************************/
public QAutomationProviderMetaData withSchedule(QScheduleMetaData schedule)
{
this.schedule = schedule;
return (this);
}
/*******************************************************************************
**
*******************************************************************************/

View File

@ -22,9 +22,6 @@
package com.kingsrook.qqq.backend.core.model.metadata.queues;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
/*******************************************************************************
** Meta-data for an source of Amazon SQS queues (e.g, an aws account/credential
** set, with a common base URL).
@ -39,8 +36,6 @@ public class SQSQueueProviderMetaData extends QQueueProviderMetaData
private String region;
private String baseURL;
private QScheduleMetaData schedule;
/*******************************************************************************
@ -201,38 +196,4 @@ public class SQSQueueProviderMetaData extends QQueueProviderMetaData
return (this);
}
/*******************************************************************************
** Getter for schedule
**
*******************************************************************************/
public QScheduleMetaData getSchedule()
{
return schedule;
}
/*******************************************************************************
** Setter for schedule
**
*******************************************************************************/
public void setSchedule(QScheduleMetaData schedule)
{
this.schedule = schedule;
}
/*******************************************************************************
** Fluent setter for schedule
**
*******************************************************************************/
public SQSQueueProviderMetaData withSchedule(QScheduleMetaData schedule)
{
this.schedule = schedule;
return (this);
}
}

View File

@ -28,11 +28,12 @@ import com.kingsrook.qqq.backend.core.utils.StringUtils;
/*******************************************************************************
** Meta-data to define scheduled actions within QQQ.
**
** Initially, only supports repeating jobs, either on a given # of seconds or millis.
** Supports repeating jobs, either on a given # of seconds or millis, or cron
** expressions (though cron may not be supported by all schedulers!)
**
** Can also specify an initialDelay - e.g., to avoid all jobs starting up at the
** same moment.
**
** In the future we most likely would want to allow cron strings to be added here.
*******************************************************************************/
public class QScheduleMetaData
{

View File

@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.model.metadata.tables.automation;
import java.util.ArrayList;
import java.util.List;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
/*******************************************************************************
@ -37,6 +38,8 @@ public class QTableAutomationDetails
private Integer overrideBatchSize;
private QScheduleMetaData schedule;
private String shardByFieldName; // field in "this" table, to use for sharding
private String shardSourceTableName; // name of the table where the shards are defined as rows
private String shardLabelFieldName; // field in shard-source-table to use for labeling shards
@ -317,4 +320,35 @@ public class QTableAutomationDetails
return (this);
}
/*******************************************************************************
** Getter for schedule
*******************************************************************************/
public QScheduleMetaData getSchedule()
{
return (this.schedule);
}
/*******************************************************************************
** Setter for schedule
*******************************************************************************/
public void setSchedule(QScheduleMetaData schedule)
{
this.schedule = schedule;
}
/*******************************************************************************
** Fluent setter for schedule
*******************************************************************************/
public QTableAutomationDetails withSchedule(QScheduleMetaData schedule)
{
this.schedule = schedule;
return (this);
}
}

View File

@ -24,8 +24,10 @@ package com.kingsrook.qqq.backend.core.scheduler;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.context.CapturedContext;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
@ -37,12 +39,14 @@ import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.automation.QAutomationProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QSchedulerMetaData;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import com.kingsrook.qqq.backend.core.utils.collections.MapBuilder;
import org.apache.commons.lang.NotImplementedException;
@ -227,6 +231,8 @@ public class QScheduleManager
// todo - read dynamic schedules and schedule those things //
// e.g., user-scheduled processes, reports //
/////////////////////////////////////////////////////////////
// ScheduledJob scheduledJob = new ScheduledJob();
// setupScheduledJob(scheduledJob);
//////////////////////////////////////////////////////////
// let the schedulers know we're done with this process //
@ -242,7 +248,7 @@ public class QScheduleManager
private void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData)
{
QSchedulerInterface scheduler = getScheduler(process.getSchedule().getSchedulerName());
scheduler.setupProcess(process, backendVariantData, SchedulerUtils.allowedToStart(process));
scheduler.setupProcess(process, backendVariantData, process.getSchedule(), SchedulerUtils.allowedToStart(process));
}
@ -269,8 +275,18 @@ public class QScheduleManager
*******************************************************************************/
private void setupSqsProvider(SQSQueueProviderMetaData queueProvider)
{
QSchedulerInterface scheduler = getScheduler(queueProvider.getSchedule().getSchedulerName());
scheduler.setupSqsProvider(queueProvider, SchedulerUtils.allowedToStart(queueProvider));
boolean allowedToStartProvider = SchedulerUtils.allowedToStart(queueProvider);
for(QQueueMetaData queue : qInstance.getQueues().values())
{
QSchedulerInterface scheduler = getScheduler(queue.getSchedule().getSchedulerName());
boolean allowedToStart = allowedToStartProvider && SchedulerUtils.allowedToStart(queue.getName());
if(queueProvider.getName().equals(queue.getProviderName()))
{
scheduler.setupSqsPoller(queueProvider, queue, queue.getSchedule(), allowedToStart);
}
}
}
@ -280,8 +296,21 @@ public class QScheduleManager
*******************************************************************************/
private void setupAutomationProviderPerTable(QAutomationProviderMetaData automationProvider)
{
QSchedulerInterface scheduler = getScheduler(automationProvider.getSchedule().getSchedulerName());
scheduler.setupAutomationProviderPerTable(automationProvider, SchedulerUtils.allowedToStart(automationProvider));
boolean allowedToStartProvider = SchedulerUtils.allowedToStart(automationProvider);
///////////////////////////////////////////////////////////////////////////////////
// ask the PollingAutomationPerTableRunner how many threads of itself need setup //
// then schedule each one of them. //
///////////////////////////////////////////////////////////////////////////////////
List<PollingAutomationPerTableRunner.TableActionsInterface> tableActionList = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName());
for(PollingAutomationPerTableRunner.TableActionsInterface tableActions : tableActionList)
{
boolean allowedToStart = allowedToStartProvider && SchedulerUtils.allowedToStart(tableActions.tableName());
QScheduleMetaData schedule = tableActions.tableAutomationDetails().getSchedule();
QSchedulerInterface scheduler = getScheduler(schedule.getSchedulerName());
scheduler.setupTableAutomation(automationProvider, tableActions, schedule, allowedToStart);
}
}

View File

@ -24,9 +24,12 @@ package com.kingsrook.qqq.backend.core.scheduler;
import java.io.Serializable;
import java.util.Map;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.model.metadata.automation.QAutomationProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
/*******************************************************************************
@ -37,17 +40,17 @@ public interface QSchedulerInterface
/*******************************************************************************
**
*******************************************************************************/
void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, boolean allowedToStart);
void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, QScheduleMetaData schedule, boolean allowedToStart);
/*******************************************************************************
**
*******************************************************************************/
void setupSqsProvider(SQSQueueProviderMetaData queueProvider, boolean allowedToStart);
void setupSqsPoller(SQSQueueProviderMetaData queueProvider, QQueueMetaData queue, QScheduleMetaData schedule, boolean allowedToStart);
/*******************************************************************************
**
*******************************************************************************/
void setupAutomationProviderPerTable(QAutomationProviderMetaData automationProvider, boolean allowedToStart);
void setupTableAutomation(QAutomationProviderMetaData automationProvider, PollingAutomationPerTableRunner.TableActionsInterface tableActions, QScheduleMetaData schedule, boolean allowedToStart);
/*******************************************************************************
**

View File

@ -36,6 +36,7 @@ import java.util.Properties;
import java.util.Set;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
@ -46,7 +47,6 @@ import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMeta
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.scheduler.QSchedulerInterface;
import com.kingsrook.qqq.backend.core.scheduler.SchedulerUtils;
import com.kingsrook.qqq.backend.core.utils.memoization.AnyKey;
import com.kingsrook.qqq.backend.core.utils.memoization.Memoization;
import org.quartz.CronExpression;
@ -83,12 +83,24 @@ public class QuartzScheduler implements QSchedulerInterface
private Scheduler scheduler;
/////////////////////////////////////////////////////////////////////////////////////////
// create memoization objects for some quartz-query functions, that we'll only want to //
// use during our setup routine, when we'd query it many times over and over again. //
// So default to a timeout of 0 (effectively disabling memoization). then in the //
// start-of-setup and end-of-setup methods, temporarily increase, then re-decrease //
/////////////////////////////////////////////////////////////////////////////////////////
private Memoization<AnyKey, List<String>> jobGroupNamesMemoization = new Memoization<AnyKey, List<String>>()
.withTimeout(Duration.of(5, ChronoUnit.SECONDS));
.withTimeout(Duration.of(0, ChronoUnit.SECONDS));
private Memoization<String, Set<JobKey>> jobKeyNamesMemoization = new Memoization<String, Set<JobKey>>()
.withTimeout(Duration.of(5, ChronoUnit.SECONDS));
.withTimeout(Duration.of(0, ChronoUnit.SECONDS));
///////////////////////////////////////////////////////////////////////////////
// vars used during the setup routine, to figure out what jobs need deleted. //
///////////////////////////////////////////////////////////////////////////////
private boolean insideSetup = false;
private List<QuartzJobAndTriggerWrapper> scheduledJobsAtStartOfSetup = new ArrayList<>();
private List<QuartzJobAndTriggerWrapper> scheduledJobsAtEndOfSetup = new ArrayList<>();
/*******************************************************************************
@ -202,7 +214,7 @@ public class QuartzScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, boolean allowedToStart)
public void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, QScheduleMetaData schedule, boolean allowedToStart)
{
/////////////////////////
// set up job data map //
@ -215,7 +227,72 @@ public class QuartzScheduler implements QSchedulerInterface
jobData.put("backendVariantData", backendVariantData);
}
scheduleJob(process.getName(), "processes", QuartzRunProcessJob.class, jobData, process.getSchedule(), allowedToStart);
scheduleJob(process.getName(), "processes", QuartzRunProcessJob.class, jobData, schedule, allowedToStart);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void startOfSetupSchedules()
{
this.insideSetup = true;
this.jobGroupNamesMemoization.setTimeout(Duration.ofSeconds(5));
this.jobKeyNamesMemoization.setTimeout(Duration.ofSeconds(5));
try
{
this.scheduledJobsAtStartOfSetup = queryQuartz();
}
catch(Exception e)
{
LOG.warn("Error querying quartz for the currently scheduled jobs during startup - will not be able to delete no-longer-needed jobs!", e);
}
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void endOfSetupSchedules()
{
this.insideSetup = false;
this.jobGroupNamesMemoization.setTimeout(Duration.ofSeconds(0));
this.jobKeyNamesMemoization.setTimeout(Duration.ofSeconds(0));
if(this.scheduledJobsAtStartOfSetup == null)
{
return;
}
try
{
Set<JobKey> startJobKeys = this.scheduledJobsAtStartOfSetup.stream().map(w -> w.jobDetail().getKey()).collect(Collectors.toSet());
Set<JobKey> endJobKeys = scheduledJobsAtEndOfSetup.stream().map(w -> w.jobDetail().getKey()).collect(Collectors.toSet());
/////////////////////////////////////////////////////////////////////////////////////////////////////
// remove all 'end' keys from the set of start keys. any left-over start-keys need to be deleted. //
/////////////////////////////////////////////////////////////////////////////////////////////////////
startJobKeys.removeAll(endJobKeys);
for(JobKey jobKey : startJobKeys)
{
LOG.info("Deleting job that had previously been scheduled, but doesn't appear to be any more", logPair("jobKey", jobKey));
deleteJob(jobKey);
}
}
catch(Exception e)
{
LOG.warn("Error trying to clean up no-longer-needed jobs at end of scheduler setup", e);
}
////////////////////////////////////////////////////
// reset these lists, no need to keep them around //
////////////////////////////////////////////////////
this.scheduledJobsAtStartOfSetup = null;
this.scheduledJobsAtEndOfSetup = null;
}
@ -294,6 +371,15 @@ public class QuartzScheduler implements QSchedulerInterface
resumeJob(jobKey.getName(), jobKey.getGroup());
}
///////////////////////////////////////////////////////////////////////////
// if we're inside the setup event (e.g., initial startup), then capture //
// this job as one that is currently active and should be kept. //
///////////////////////////////////////////////////////////////////////////
if(insideSetup)
{
scheduledJobsAtEndOfSetup.add(new QuartzJobAndTriggerWrapper(jobDetail, trigger, null));
}
return (true);
}
catch(Exception e)
@ -309,57 +395,37 @@ public class QuartzScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void setupSqsProvider(SQSQueueProviderMetaData sqsQueueProvider, boolean allowedToStartProvider)
public void setupSqsPoller(SQSQueueProviderMetaData queueProvider, QQueueMetaData queue, QScheduleMetaData schedule, boolean allowedToStart)
{
for(QQueueMetaData queue : qInstance.getQueues().values())
{
boolean allowedToStart = allowedToStartProvider && SchedulerUtils.allowedToStart(queue.getName());
/////////////////////////
// set up job data map //
/////////////////////////
Map<String, Object> jobData = new HashMap<>();
jobData.put("queueProviderName", queueProvider.getName());
jobData.put("queueName", queue.getName());
if(sqsQueueProvider.getName().equals(queue.getProviderName()))
{
/////////////////////////
// set up job data map //
/////////////////////////
Map<String, Object> jobData = new HashMap<>();
jobData.put("queueProviderName", sqsQueueProvider.getName());
jobData.put("queueName", queue.getName());
scheduleJob(queue.getName(), "sqsQueue", QuartzSqsPollerJob.class, jobData, sqsQueueProvider.getSchedule(), allowedToStart);
}
}
scheduleJob(queue.getName(), "sqsQueue", QuartzSqsPollerJob.class, jobData, schedule, allowedToStart);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void setupAutomationProviderPerTable(QAutomationProviderMetaData automationProvider, boolean allowedToStartProvider)
public void setupTableAutomation(QAutomationProviderMetaData automationProvider, PollingAutomationPerTableRunner.TableActionsInterface tableActions, QScheduleMetaData schedule, boolean allowedToStart)
{
///////////////////////////////////////////////////////////////////////////////////
// ask the PollingAutomationPerTableRunner how many threads of itself need setup //
// then start a scheduled executor foreach one //
///////////////////////////////////////////////////////////////////////////////////
List<PollingAutomationPerTableRunner.TableActionsInterface> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName());
for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions)
{
boolean allowedToStart = allowedToStartProvider && SchedulerUtils.allowedToStart(tableAction.tableName());
/////////////////////////
// set up job data map //
/////////////////////////
Map<String, Object> jobData = new HashMap<>();
jobData.put("automationProviderName", automationProvider.getName());
jobData.put("tableName", tableActions.tableName());
jobData.put("automationStatus", tableActions.status().toString());
/////////////////////////
// set up job data map //
/////////////////////////
Map<String, Object> jobData = new HashMap<>();
jobData.put("automationProviderName", automationProvider.getName());
jobData.put("tableName", tableAction.tableName());
jobData.put("automationStatus", tableAction.status().toString());
scheduleJob(tableAction.tableName() + "." + tableAction.status(), "tableAutomations", QuartzTableAutomationsJob.class, jobData, automationProvider.getSchedule(), allowedToStart);
}
scheduleJob(tableActions.tableName() + "." + tableActions.status(), "tableAutomations", QuartzTableAutomationsJob.class, jobData, schedule, allowedToStart);
}
/*******************************************************************************
**
*******************************************************************************/
@ -416,7 +482,7 @@ public class QuartzScheduler implements QSchedulerInterface
/*******************************************************************************
**
*******************************************************************************/
private boolean deleteJob(JobKey jobKey)
public boolean deleteJob(JobKey jobKey)
{
try
{

View File

@ -27,6 +27,8 @@ import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomati
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDataMap;
@ -63,7 +65,24 @@ public class QuartzTableAutomationsJob implements Job
automationStatus = AutomationStatus.valueOf(jobDataMap.getString("automationStatus"));
QInstance qInstance = QuartzScheduler.getInstance().getQInstance();
PollingAutomationPerTableRunner.TableActionsInterface tableAction = new PollingAutomationPerTableRunner.TableActions(tableName, automationStatus);
QTableMetaData table = qInstance.getTable(tableName);
if(table == null)
{
LOG.warn("Could not find table for automations in QInstance", logPair("tableName", tableName));
return;
}
QTableAutomationDetails automationDetails = table.getAutomationDetails();
if(automationDetails == null)
{
LOG.warn("Could not find automationDetails for table for automations in QInstance", logPair("tableName", tableName));
return;
}
///////////////////////////////////
// todo - sharded automations... //
///////////////////////////////////
PollingAutomationPerTableRunner.TableActionsInterface tableAction = new PollingAutomationPerTableRunner.TableActions(tableName, automationDetails, automationStatus);
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProviderName, QuartzScheduler.getInstance().getSessionSupplier(), tableAction);
/////////////

View File

@ -26,7 +26,6 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Supplier;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.actions.queues.SQSQueuePoller;
@ -141,32 +140,19 @@ public class SimpleScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void setupAutomationProviderPerTable(QAutomationProviderMetaData automationProvider, boolean allowedToStartProvider)
public void setupTableAutomation(QAutomationProviderMetaData automationProvider, PollingAutomationPerTableRunner.TableActionsInterface tableActions, QScheduleMetaData schedule, boolean allowedToStart)
{
if(!allowedToStartProvider)
if(!allowedToStart)
{
return;
}
///////////////////////////////////////////////////////////////////////////////////
// ask the PollingAutomationPerTableRunner how many threads of itself need setup //
// then start a scheduled executor foreach one //
///////////////////////////////////////////////////////////////////////////////////
List<PollingAutomationPerTableRunner.TableActionsInterface> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName());
for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions)
{
if(SchedulerUtils.allowedToStart(tableAction.tableName()))
{
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProvider.getName(), sessionSupplier, tableAction);
StandardScheduledExecutor executor = new StandardScheduledExecutor(runner);
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProvider.getName(), sessionSupplier, tableActions);
StandardScheduledExecutor executor = new StandardScheduledExecutor(runner);
QScheduleMetaData schedule = Objects.requireNonNullElseGet(automationProvider.getSchedule(), this::getDefaultSchedule);
executor.setName(runner.getName());
setScheduleInExecutor(schedule, executor);
executors.add(executor);
}
}
executor.setName(runner.getName());
setScheduleInExecutor(schedule, executor);
executors.add(executor);
}
@ -175,9 +161,9 @@ public class SimpleScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void setupSqsProvider(SQSQueueProviderMetaData queueProvider, boolean allowedToStartProvider)
public void setupSqsPoller(SQSQueueProviderMetaData queueProvider, QQueueMetaData queue, QScheduleMetaData schedule, boolean allowedToStart)
{
if(!allowedToStartProvider)
if(!allowedToStart)
{
return;
}
@ -185,27 +171,17 @@ public class SimpleScheduler implements QSchedulerInterface
QInstance scheduleManagerQueueInstance = qInstance;
Supplier<QSession> scheduleManagerSessionSupplier = sessionSupplier;
for(QQueueMetaData queue : qInstance.getQueues().values())
{
if(queueProvider.getName().equals(queue.getProviderName()) && SchedulerUtils.allowedToStart(queue.getName()))
{
SQSQueuePoller sqsQueuePoller = new SQSQueuePoller();
sqsQueuePoller.setQueueProviderMetaData(queueProvider);
sqsQueuePoller.setQueueMetaData(queue);
sqsQueuePoller.setQInstance(scheduleManagerQueueInstance);
sqsQueuePoller.setSessionSupplier(scheduleManagerSessionSupplier);
SQSQueuePoller sqsQueuePoller = new SQSQueuePoller();
sqsQueuePoller.setQueueProviderMetaData(queueProvider);
sqsQueuePoller.setQueueMetaData(queue);
sqsQueuePoller.setQInstance(scheduleManagerQueueInstance);
sqsQueuePoller.setSessionSupplier(scheduleManagerSessionSupplier);
StandardScheduledExecutor executor = new StandardScheduledExecutor(sqsQueuePoller);
StandardScheduledExecutor executor = new StandardScheduledExecutor(sqsQueuePoller);
QScheduleMetaData schedule = Objects.requireNonNullElseGet(queue.getSchedule(),
() -> Objects.requireNonNullElseGet(queueProvider.getSchedule(),
this::getDefaultSchedule));
executor.setName(queue.getName());
setScheduleInExecutor(schedule, executor);
executors.add(executor);
}
}
executor.setName(queue.getName());
setScheduleInExecutor(schedule, executor);
executors.add(executor);
}
@ -214,7 +190,7 @@ public class SimpleScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, boolean allowedToStart)
public void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, QScheduleMetaData schedule, boolean allowedToStart)
{
if(!allowedToStart)
{
@ -228,7 +204,7 @@ public class SimpleScheduler implements QSchedulerInterface
StandardScheduledExecutor executor = new StandardScheduledExecutor(runProcess);
executor.setName("process:" + process.getName());
setScheduleInExecutor(process.getSchedule(), executor);
setScheduleInExecutor(schedule, executor);
executors.add(executor);
}

View File

@ -32,8 +32,10 @@ import com.kingsrook.qqq.backend.core.model.session.QSession;
/*******************************************************************************
** Standard class ran by ScheduleManager. Takes a Runnable in its constructor -
** that's the code that actually executes.
** Standard class ran by SimpleScheduler. Takes a Runnable in its constructor -
** that's the code that actually executes. Internally, this class will launch
** a newSingleThreadScheduledExecutor / ScheduledExecutorService to run the
** runnable on a repeating delay.
**
*******************************************************************************/
public class StandardScheduledExecutor

View File

@ -575,8 +575,8 @@ class PollingAutomationPerTableRunnerTest extends BaseTest
@Test
void testLoadingRecordTypesToEnsureClassCoverage()
{
new PollingAutomationPerTableRunner.TableActions(null, null).noopToFakeTestCoverage();
new PollingAutomationPerTableRunner.ShardedTableActions(null, null, null, null, null).noopToFakeTestCoverage();
new PollingAutomationPerTableRunner.TableActions(null, null, null).noopToFakeTestCoverage();
new PollingAutomationPerTableRunner.ShardedTableActions(null, null, null, null, null, null).noopToFakeTestCoverage();
}

View File

@ -469,19 +469,19 @@ public class QInstanceValidatorTest extends BaseTest
assertValidationSuccess((qInstance) -> qInstance.getProcess(processName).withSchedule(baseScheduleMetaData.get().withCronExpression(validCronString).withCronTimeZoneId("UTC")));
assertValidationSuccess((qInstance) -> qInstance.getProcess(processName).withSchedule(baseScheduleMetaData.get().withCronExpression(validCronString).withCronTimeZoneId("America/New_York")));
//////////////////////////////////////////////////////////////////
// make sure automation providers get their schedules validated //
//////////////////////////////////////////////////////////////////
assertValidationFailureReasons((qInstance) -> qInstance.getAutomationProvider(TestUtils.POLLING_AUTOMATION).withSchedule(baseScheduleMetaData.get()
///////////////////////////////////////////////////////////////
// make sure table automations get their schedules validated //
///////////////////////////////////////////////////////////////
assertValidationFailureReasons((qInstance) -> qInstance.getTable(TestUtils.TABLE_NAME_PERSON_MEMORY).getAutomationDetails().withSchedule(baseScheduleMetaData.get()
.withSchedulerName(null)
.withCronExpression(validCronString)
.withCronTimeZoneId("UTC")),
"is missing a scheduler name");
/////////////////////////////////////////////////////////////
// make sure queue providers get their schedules validated //
/////////////////////////////////////////////////////////////
assertValidationFailureReasons((qInstance) -> ((SQSQueueProviderMetaData)qInstance.getQueueProvider(TestUtils.DEFAULT_QUEUE_PROVIDER)).withSchedule(baseScheduleMetaData.get()
////////////////////////////////////////////////////
// make sure queues get their schedules validated //
////////////////////////////////////////////////////
assertValidationFailureReasons((qInstance) -> (qInstance.getQueue(TestUtils.TEST_SQS_QUEUE)).withSchedule(baseScheduleMetaData.get()
.withSchedulerName(null)
.withCronExpression(validCronString)
.withCronTimeZoneId("UTC")),

View File

@ -43,7 +43,9 @@ import com.kingsrook.qqq.backend.core.utils.SleepUtils;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.quartz.SchedulerException;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -59,7 +61,27 @@ class QuartzSchedulerTest extends BaseTest
@AfterEach
void afterEach()
{
QScheduleManager.getInstance().unInit();
try
{
QScheduleManager.getInstance().unInit();
}
catch(IllegalStateException ise)
{
/////////////////////////////////////////////////////////////////
// ok, might just mean that this test didn't init the instance //
/////////////////////////////////////////////////////////////////
}
try
{
QuartzScheduler.getInstance().unInit();
}
catch(IllegalStateException ise)
{
/////////////////////////////////////////////////////////////////
// ok, might just mean that this test didn't init the instance //
/////////////////////////////////////////////////////////////////
}
}
@ -84,15 +106,7 @@ class QuartzSchedulerTest extends BaseTest
//////////////////////////////////////////
// add a process we can run and observe //
//////////////////////////////////////////
qInstance.addProcess(new QProcessMetaData()
.withName("testScheduledProcess")
.withSchedule(new QScheduleMetaData()
.withSchedulerName(QuartzTestUtils.QUARTZ_SCHEDULER_NAME)
.withRepeatMillis(2)
.withInitialDelaySeconds(0))
.withStepList(List.of(new QBackendStepMetaData()
.withName("step")
.withCode(new QCodeReference(BasicStep.class)))));
qInstance.addProcess(buildTestProcess("testScheduledProcess"));
//////////////////////////////////////////////////////////////////////////////
// start the schedule manager, which will schedule things, and start quartz //
@ -108,7 +122,7 @@ class QuartzSchedulerTest extends BaseTest
qScheduleManager.stopAsync();
System.out.println("Ran: " + BasicStep.counter + " times");
assertTrue(BasicStep.counter > 1, "Scheduled process should have ran at least twice (but only ran [" + BasicStep.counter + "] time(s).");
assertTrue(BasicStep.counter > 1, "Scheduled process should have ran at least twice (but only ran [" + BasicStep.counter + "] time(s)).");
//////////////////////////////////////////////////////
// make sure poller ran, and didn't issue any warns //
@ -132,6 +146,56 @@ class QuartzSchedulerTest extends BaseTest
/*******************************************************************************
**
*******************************************************************************/
private static QProcessMetaData buildTestProcess(String name)
{
return new QProcessMetaData()
.withName(name)
.withSchedule(new QScheduleMetaData()
.withSchedulerName(QuartzTestUtils.QUARTZ_SCHEDULER_NAME)
.withRepeatMillis(2)
.withInitialDelaySeconds(0))
.withStepList(List.of(new QBackendStepMetaData()
.withName("step")
.withCode(new QCodeReference(BasicStep.class))));
}
/*******************************************************************************
**
*******************************************************************************/
@Test
void testRemovingNoLongerNeededJobsDuringSetupSchedules() throws SchedulerException
{
QInstance qInstance = QContext.getQInstance();
QuartzTestUtils.setupInstanceForQuartzTests();
////////////////////////////
// put two jobs in quartz //
////////////////////////////
QProcessMetaData test1 = buildTestProcess("test1");
QProcessMetaData test2 = buildTestProcess("test2");
qInstance.addProcess(test1);
qInstance.addProcess(test2);
QuartzScheduler quartzScheduler = QuartzScheduler.initInstance(qInstance, QuartzTestUtils.QUARTZ_SCHEDULER_NAME, QuartzTestUtils.getQuartzProperties(), () -> QContext.getQSession());
quartzScheduler.setupProcess(test1, null, test1.getSchedule(), false);
quartzScheduler.setupProcess(test2, null, test2.getSchedule(), false);
quartzScheduler.startOfSetupSchedules();
quartzScheduler.setupProcess(test1, null, test1.getSchedule(), false);
quartzScheduler.endOfSetupSchedules();
List<QuartzJobAndTriggerWrapper> quartzJobAndTriggerWrappers = quartzScheduler.queryQuartz();
assertEquals(1, quartzJobAndTriggerWrappers.size());
assertEquals("test1", quartzJobAndTriggerWrappers.get(0).jobDetail().getKey().getName());
}
/*******************************************************************************
**
*******************************************************************************/

View File

@ -26,7 +26,6 @@ import java.util.List;
import java.util.Properties;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.quartz.QuartzSchedulerMetaData;
import org.quartz.SchedulerException;
@ -43,7 +42,7 @@ public class QuartzTestUtils
/*******************************************************************************
**
*******************************************************************************/
private static Properties getQuartzProperties()
public static Properties getQuartzProperties()
{
Properties quartzProperties = new Properties();
quartzProperties.put("org.quartz.scheduler.instanceName", QUARTZ_SCHEDULER_NAME);
@ -76,12 +75,16 @@ public class QuartzTestUtils
////////////////////////////////////////////////////////////////////////////////
// set the queue providers & automation providers to use the quartz scheduler //
////////////////////////////////////////////////////////////////////////////////
qInstance.getAutomationProviders().values()
.forEach(ap -> ap.getSchedule().setSchedulerName(QUARTZ_SCHEDULER_NAME));
qInstance.getQueueProviders().values()
.forEach(qp -> ((SQSQueueProviderMetaData) qp).getSchedule().setSchedulerName(QUARTZ_SCHEDULER_NAME));
qInstance.getTables().values().forEach(t ->
{
if(t.getAutomationDetails() != null)
{
t.getAutomationDetails().getSchedule().setSchedulerName(QUARTZ_SCHEDULER_NAME);
}
});
qInstance.getQueues().values()
.forEach(q -> q.getSchedule().setSchedulerName(QUARTZ_SCHEDULER_NAME));
}

View File

@ -89,8 +89,28 @@ class QuartzJobsProcessTest extends BaseTest
@AfterEach
void afterEach()
{
QScheduleManager.getInstance().stop();
QScheduleManager.getInstance().unInit();
try
{
QScheduleManager.getInstance().stop();
QScheduleManager.getInstance().unInit();
}
catch(IllegalStateException ise)
{
/////////////////////////////////////////////////////////////////
// ok, might just mean that this test didn't init the instance //
/////////////////////////////////////////////////////////////////
}
try
{
QuartzScheduler.getInstance().unInit();
}
catch(IllegalStateException ise)
{
/////////////////////////////////////////////////////////////////
// ok, might just mean that this test didn't init the instance //
/////////////////////////////////////////////////////////////////
}
}

View File

@ -180,6 +180,7 @@ public class TestUtils
public static final String SECURITY_KEY_TYPE_INTERNAL_OR_EXTERNAL = "internalOrExternal";
public static final String SIMPLE_SCHEDULER_NAME = "simpleScheduler";
public static final String TEST_SQS_QUEUE = "testSQSQueue";
@ -366,10 +367,7 @@ public class TestUtils
private static QAutomationProviderMetaData definePollingAutomationProvider()
{
return (new PollingAutomationProviderMetaData()
.withName(POLLING_AUTOMATION)
.withSchedule(new QScheduleMetaData()
.withSchedulerName(SIMPLE_SCHEDULER_NAME)
.withRepeatSeconds(60)));
.withName(POLLING_AUTOMATION));
}
@ -746,6 +744,9 @@ public class TestUtils
{
return (new QTableAutomationDetails()
.withProviderName(POLLING_AUTOMATION)
.withSchedule(new QScheduleMetaData()
.withSchedulerName(SIMPLE_SCHEDULER_NAME)
.withRepeatSeconds(60))
.withStatusTracking(new AutomationStatusTracking()
.withType(AutomationStatusTrackingType.FIELD_IN_TABLE)
.withFieldName("qqqAutomationStatus")));
@ -1333,10 +1334,7 @@ public class TestUtils
.withAccessKey(accessKey)
.withSecretKey(secretKey)
.withRegion(region)
.withBaseURL(baseURL)
.withSchedule(new QScheduleMetaData()
.withRepeatSeconds(60)
.withSchedulerName(SIMPLE_SCHEDULER_NAME)));
.withBaseURL(baseURL));
}
@ -1347,10 +1345,13 @@ public class TestUtils
private static QQueueMetaData defineTestSqsQueue()
{
return (new QQueueMetaData()
.withName("testSQSQueue")
.withName(TEST_SQS_QUEUE)
.withProviderName(DEFAULT_QUEUE_PROVIDER)
.withQueueName("test-queue")
.withProcessName(PROCESS_NAME_INCREASE_BIRTHDATE));
.withProcessName(PROCESS_NAME_INCREASE_BIRTHDATE)
.withSchedule(new QScheduleMetaData()
.withRepeatSeconds(60)
.withSchedulerName(SIMPLE_SCHEDULER_NAME)));
}