CE-936 - Refactored, for more generic handling of job types

- Introduced SchedulableType, SchedulableRunner, SchedulableIdentity classes
- removed job-type specific methods from QScheduleManager and QSchedulerInterface
- Add scheduler-level management processes
- Change quartz to not change schedules during service startup
- re-added repeatSeconds to ScheduledJob
This commit is contained in:
2024-03-18 12:24:40 -05:00
parent 0130e34112
commit 753c224196
30 changed files with 2059 additions and 673 deletions

View File

@ -435,10 +435,7 @@ public class QInstanceValidator
assertCondition(qInstance.getProcesses() != null && qInstance.getProcess(queue.getProcessName()) != null, "Unrecognized processName for queue: " + name);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// todo - if we have, in the future, a provider that doesn't require schedules per-queue, then make this check conditional //
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
if(assertCondition(queue.getSchedule() != null, "Missing schedule for SQSQueueProvider: " + name))
if(queue.getSchedule() != null)
{
validateScheduleMetaData(queue.getSchedule(), qInstance, "SQSQueueProvider " + name + ", schedule: ");
}
@ -1024,7 +1021,7 @@ public class QInstanceValidator
assertCondition(qInstance.getAutomationProvider(providerName) != null, " has an unrecognized providerName: " + providerName);
}
if(assertCondition(automationDetails.getSchedule() != null, prefix + "Missing schedule for automations"))
if(automationDetails.getSchedule() != null)
{
validateScheduleMetaData(automationDetails.getSchedule(), qInstance, prefix + " automationDetails, schedule: ");
}

View File

@ -56,6 +56,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.reporting.QReportMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QSchedulerMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.security.QSecurityKeyType;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
import io.github.cdimascio.dotenv.Dotenv;
@ -91,7 +92,9 @@ public class QInstance
private Map<String, QWidgetMetaDataInterface> widgets = new LinkedHashMap<>();
private Map<String, QQueueProviderMetaData> queueProviders = new LinkedHashMap<>();
private Map<String, QQueueMetaData> queues = new LinkedHashMap<>();
private Map<String, QSchedulerMetaData> schedulers = new LinkedHashMap<>();
private Map<String, QSchedulerMetaData> schedulers = new LinkedHashMap<>();
private Map<String, SchedulableType> schedulableTypes = new LinkedHashMap<>();
private Map<String, QSupplementalInstanceMetaData> supplementalMetaData = new LinkedHashMap<>();
@ -1278,4 +1281,54 @@ public class QInstance
}
/*******************************************************************************
**
*******************************************************************************/
public void addSchedulableType(SchedulableType schedulableType)
{
String name = schedulableType.getName();
if(!StringUtils.hasContent(name))
{
throw (new IllegalArgumentException("Attempted to add a schedulableType without a name."));
}
if(this.schedulableTypes.containsKey(name))
{
throw (new IllegalArgumentException("Attempted to add a second schedulableType with name: " + name));
}
this.schedulableTypes.put(name, schedulableType);
}
/*******************************************************************************
**
*******************************************************************************/
public SchedulableType getSchedulableType(String name)
{
return (this.schedulableTypes.get(name));
}
/*******************************************************************************
** Getter for schedulableTypes
**
*******************************************************************************/
public Map<String, SchedulableType> getSchedulableTypes()
{
return schedulableTypes;
}
/*******************************************************************************
** Setter for schedulableTypes
**
*******************************************************************************/
public void setSchedulableTypes(Map<String, SchedulableType> schedulableTypes)
{
this.schedulableTypes = schedulableTypes;
}
}

View File

@ -33,6 +33,7 @@ import com.kingsrook.qqq.backend.core.model.data.QAssociation;
import com.kingsrook.qqq.backend.core.model.data.QField;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.data.QRecordEntity;
import com.kingsrook.qqq.backend.core.model.metadata.fields.DisplayFormat;
import com.kingsrook.qqq.backend.core.model.metadata.fields.ValueTooLongBehavior;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import com.kingsrook.qqq.backend.core.utils.collections.MutableMap;
@ -69,6 +70,9 @@ public class ScheduledJob extends QRecordEntity
@QField(maxLength = 100, valueTooLongBehavior = ValueTooLongBehavior.ERROR, possibleValueSourceName = TimeZonePossibleValueSourceMetaDataProvider.NAME)
private String cronTimeZoneId;
@QField(displayFormat = DisplayFormat.COMMAS)
private Integer repeatSeconds;
@QField(isRequired = true, maxLength = 100, valueTooLongBehavior = ValueTooLongBehavior.ERROR, possibleValueSourceName = ScheduledJobType.NAME)
private String type;
@ -458,4 +462,35 @@ public class ScheduledJob extends QRecordEntity
return (this);
}
/*******************************************************************************
** Getter for repeatSeconds
*******************************************************************************/
public Integer getRepeatSeconds()
{
return (this.repeatSeconds);
}
/*******************************************************************************
** Setter for repeatSeconds
*******************************************************************************/
public void setRepeatSeconds(Integer repeatSeconds)
{
this.repeatSeconds = repeatSeconds;
}
/*******************************************************************************
** Fluent setter for repeatSeconds
*******************************************************************************/
public ScheduledJob withRepeatSeconds(Integer repeatSeconds)
{
this.repeatSeconds = repeatSeconds;
return (this);
}
}

View File

@ -66,7 +66,7 @@ public class ScheduledJobsMetaDataProvider
instance.addPossibleValueSource(QPossibleValueSource.newForEnum(ScheduledJobType.NAME, ScheduledJobType.values()));
instance.addPossibleValueSource(defineSchedulersPossibleValueSource());
defineStandardJoins(instance);
defineStandardScriptsWidgets(instance);
defineStandardWidgets(instance);
}
@ -74,11 +74,12 @@ public class ScheduledJobsMetaDataProvider
/*******************************************************************************
**
*******************************************************************************/
public void defineStandardScriptsWidgets(QInstance instance)
public void defineStandardWidgets(QInstance instance)
{
QJoinMetaData join = instance.getJoin(JOB_PARAMETER_JOIN_NAME);
instance.addWidget(ChildRecordListRenderer.widgetMetaDataBuilder(join)
.withCanAddChildRecord(true)
.withManageAssociationName(ScheduledJobParameter.TABLE_NAME)
.withLabel("Parameters")
.getWidgetMetaData()
.withPermissionRules(new QPermissionRules().withLevel(PermissionLevel.NOT_PROTECTED)));
@ -165,7 +166,7 @@ public class ScheduledJobsMetaDataProvider
.withRecordLabelFormat("%s")
.withRecordLabelFields("label")
.withSection(new QFieldSection("identity", new QIcon().withName("badge"), Tier.T1, List.of("id", "label", "description")))
.withSection(new QFieldSection("schedule", new QIcon().withName("alarm"), Tier.T2, List.of("cronExpression", "cronTimeZoneId")))
.withSection(new QFieldSection("schedule", new QIcon().withName("alarm"), Tier.T2, List.of("cronExpression", "cronTimeZoneId", "repeatSeconds")))
.withSection(new QFieldSection("settings", new QIcon().withName("tune"), Tier.T2, List.of("type", "isActive", "schedulerName")))
.withSection(new QFieldSection("parameters", new QIcon().withName("list"), Tier.T2).withWidgetName(JOB_PARAMETER_JOIN_NAME))
.withSection(new QFieldSection("dates", new QIcon().withName("calendar_month"), Tier.T3, List.of("createDate", "modifyDate")));
@ -178,9 +179,9 @@ public class ScheduledJobsMetaDataProvider
tableMetaData.withCustomizer(TableCustomizers.POST_DELETE_RECORD, customizerReference);
tableMetaData.withAssociation(new Association()
.withName(ScheduledJobParameter.TABLE_NAME)
.withAssociatedTableName(ScheduledJobParameter.TABLE_NAME)
.withJoinName(JOB_PARAMETER_JOIN_NAME)
.withName(ScheduledJobParameter.TABLE_NAME));
.withJoinName(JOB_PARAMETER_JOIN_NAME));
return (tableMetaData);
}
@ -195,8 +196,7 @@ public class ScheduledJobsMetaDataProvider
QTableMetaData tableMetaData = defineStandardTable(backendName, ScheduledJobParameter.TABLE_NAME, ScheduledJobParameter.class)
.withRecordLabelFormat("%s - %s")
.withRecordLabelFields("scheduledJobId", "key")
.withSection(new QFieldSection("identity", new QIcon().withName("badge"), Tier.T1, List.of("id", "scheduledJobId", "key")))
.withSection(new QFieldSection("value", new QIcon().withName("dataset"), Tier.T2, List.of("value")))
.withSection(new QFieldSection("identity", new QIcon().withName("badge"), Tier.T1, List.of("id", "scheduledJobId", "key", "value")))
.withSection(new QFieldSection("dates", new QIcon().withName("calendar_month"), Tier.T3, List.of("createDate", "modifyDate")));
return (tableMetaData);

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
import com.kingsrook.qqq.backend.core.context.CapturedContext;
import com.kingsrook.qqq.backend.core.context.QContext;
@ -40,19 +41,27 @@ import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.automation.QAutomationProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.VariantRunStrategy;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QSchedulerMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJob;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJobType;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.BasicSchedulableIdentity;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentity;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentityFactory;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableProcessRunner;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableRunner;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableSQSQueueRunner;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableTableAutomationsRunner;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import com.kingsrook.qqq.backend.core.utils.collections.MapBuilder;
import org.apache.commons.lang.NotImplementedException;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
@ -87,17 +96,48 @@ public class QScheduleManager
** Singleton initiator - e.g., must be called to initially initialize the singleton
** before anyone else calls getInstance (they'll get an error if they call that first).
*******************************************************************************/
public static QScheduleManager initInstance(QInstance qInstance, Supplier<QSession> systemUserSessionSupplier)
public static QScheduleManager initInstance(QInstance qInstance, Supplier<QSession> systemUserSessionSupplier) throws QException
{
if(qScheduleManager == null)
{
qScheduleManager = new QScheduleManager(qInstance, systemUserSessionSupplier);
/////////////////////////////////////////////////////////////////
// if the instance doesn't have any schedulable types defined, //
// then go ahead and add the default set that qqq knows about //
/////////////////////////////////////////////////////////////////
if(CollectionUtils.nullSafeIsEmpty(qInstance.getSchedulableTypes()))
{
defineDefaultSchedulableTypesInInstance(qInstance);
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// initialize the scheduler(s) we're configured to use //
// do this, even if we won't start them - so, for example, a web server can still be aware of schedules in the application //
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
for(QSchedulerMetaData schedulerMetaData : CollectionUtils.nonNullMap(qInstance.getSchedulers()).values())
{
QSchedulerInterface scheduler = schedulerMetaData.initSchedulerInstance(qInstance, systemUserSessionSupplier);
qScheduleManager.schedulers.put(schedulerMetaData.getName(), scheduler);
}
}
return (qScheduleManager);
}
/*******************************************************************************
**
*******************************************************************************/
public static void defineDefaultSchedulableTypesInInstance(QInstance qInstance)
{
qInstance.addSchedulableType(new SchedulableType().withName(ScheduledJobType.PROCESS.getId()).withRunner(new QCodeReference(SchedulableProcessRunner.class)));
qInstance.addSchedulableType(new SchedulableType().withName(ScheduledJobType.QUEUE_PROCESSOR.getId()).withRunner(new QCodeReference(SchedulableSQSQueueRunner.class)));
qInstance.addSchedulableType(new SchedulableType().withName(ScheduledJobType.TABLE_AUTOMATIONS.getId()).withRunner(new QCodeReference(SchedulableTableAutomationsRunner.class)));
}
/*******************************************************************************
** Singleton accessor
*******************************************************************************/
@ -117,29 +157,20 @@ public class QScheduleManager
*******************************************************************************/
public void start() throws QException
{
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// initialize the scheduler(s) we're configured to use //
// do this, even if we won't start them - so, for example, a web server can still be aware of schedules in the application //
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
for(QSchedulerMetaData schedulerMetaData : CollectionUtils.nonNullMap(qInstance.getSchedulers()).values())
{
QSchedulerInterface scheduler = schedulerMetaData.initSchedulerInstance(qInstance, systemUserSessionSupplier);
schedulers.put(schedulerMetaData.getName(), scheduler);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////
// now, exist w/o setting up schedules and not starting schedules, if schedule manager isn't enabled here //
////////////////////////////////////////////////////////////////////////////////////////////////////////////
//////////////////////////////////////////////////////////////////////////
// exit w/o starting schedulers, if schedule manager isn't enabled here //
//////////////////////////////////////////////////////////////////////////
if(!new QMetaDataVariableInterpreter().getBooleanFromPropertyOrEnvironment("qqq.scheduleManager.enabled", "QQQ_SCHEDULE_MANAGER_ENABLED", true))
{
LOG.info("Not starting ScheduleManager per settings.");
schedulers.values().forEach(s -> s.doNotStart());
return;
}
/////////////////////////////////////////////////////////////////////////////////////////////////
// ensure that everything which should be scheduled is scheduled, in the appropriate scheduler //
/////////////////////////////////////////////////////////////////////////////////////////////////
QContext.withTemporaryContext(new CapturedContext(qInstance, systemUserSessionSupplier.get()), () -> setupSchedules());
QContext.withTemporaryContext(new CapturedContext(qInstance, systemUserSessionSupplier.get()), () -> setupAllSchedules());
//////////////////////////
// start each scheduler //
@ -172,7 +203,7 @@ public class QScheduleManager
/*******************************************************************************
**
*******************************************************************************/
private void setupSchedules()
public void setupAllSchedules() throws QException
{
/////////////////////////////////////////////
// read dynamic schedules //
@ -199,20 +230,27 @@ public class QScheduleManager
/////////////////////////////////////////////////////////
schedulers.values().forEach(s -> s.startOfSetupSchedules());
//////////////////////////////////
// schedule all queue providers //
//////////////////////////////////
for(QQueueProviderMetaData queueProvider : qInstance.getQueueProviders().values())
/////////////////////////
// schedule all queues //
/////////////////////////
for(QQueueMetaData queue : qInstance.getQueues().values())
{
setupQueueProvider(queueProvider);
if(queue.getSchedule() != null)
{
setupQueue(queue);
}
}
///////////////////////////////////////
// schedule all automation providers //
///////////////////////////////////////
for(QAutomationProviderMetaData automationProvider : qInstance.getAutomationProviders().values())
////////////////////////////////////////
// schedule all tables w/ automations //
////////////////////////////////////////
for(QTableMetaData table : qInstance.getTables().values())
{
setupAutomationProviderPerTable(automationProvider);
QTableAutomationDetails automationDetails = table.getAutomationDetails();
if(automationDetails != null && automationDetails.getSchedule() != null)
{
setupTableAutomations(table);
}
}
/////////////////////////////////////////
@ -253,47 +291,60 @@ public class QScheduleManager
/*******************************************************************************
**
*******************************************************************************/
public void setupScheduledJob(ScheduledJob scheduledJob)
public void setupScheduledJob(ScheduledJob scheduledJob) throws QException
{
///////////////////////////////////////////////////////////////////////////////////////////
// non-active jobs should be deleted from the scheduler. they get re-added //
// if they get re-activated. but we don't want to rely on (e.g., for quartz) the paused //
// state to be drive by is-active. else, devops-pause & unpause ops would clobber //
// scheduled-job record facts //
///////////////////////////////////////////////////////////////////////////////////////////
BasicSchedulableIdentity schedulableIdentity = SchedulableIdentityFactory.of(scheduledJob);
////////////////////////////////////////////////////////////////////////////////
// non-active jobs should be deleted from the scheduler. they get re-added //
// if they get re-activated. but we don't want to rely on (e.g., for quartz) //
// the paused state to be drive by is-active. else, devops-pause & unpause //
// operations would clobber scheduled-job record facts //
////////////////////////////////////////////////////////////////////////////////
if(!scheduledJob.getIsActive())
{
unscheduleScheduledJob(scheduledJob);
return;
}
QSchedulerInterface scheduler = getScheduler(scheduledJob.getSchedulerName());
String exceptionSuffix = "in scheduledJob [" + scheduledJob.getId() + "]";
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// setup schedule meta-data object based on schedule data in the scheduled job - throwing if not well populated //
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
if(scheduledJob.getRepeatSeconds() == null && !StringUtils.hasContent(scheduledJob.getCronExpression()))
{
throw (new QException("Missing a schedule (cronString or repeatSeconds) " + exceptionSuffix));
}
QScheduleMetaData scheduleMetaData = new QScheduleMetaData();
scheduleMetaData.setCronExpression(scheduledJob.getCronExpression());
scheduleMetaData.setCronTimeZoneId(scheduledJob.getCronTimeZoneId());
scheduleMetaData.setRepeatSeconds(scheduledJob.getRepeatSeconds());
switch(ScheduledJobType.getById(scheduledJob.getType()))
/////////////////////////////////
// get & validate the job type //
/////////////////////////////////
if(!StringUtils.hasContent(scheduledJob.getType()))
{
case PROCESS ->
{
Map<String, String> paramMap = scheduledJob.getJobParametersMap();
String processName = paramMap.get("processName");
QProcessMetaData process = qInstance.getProcess(processName);
// todo - variants... serial vs parallel?
scheduler.setupProcess(process, null, scheduleMetaData, true);
}
case QUEUE_PROCESSOR ->
{
throw new NotImplementedException("ScheduledJob queue processors are not yet implemented...");
}
case TABLE_AUTOMATIONS ->
{
throw new NotImplementedException("ScheduledJob table automations are not yet implemented...");
}
default -> throw new IllegalStateException("Unexpected value: " + ScheduledJobType.getById(scheduledJob.getType()));
throw (new QException("Missing a type " + exceptionSuffix));
}
ScheduledJobType scheduledJobType = ScheduledJobType.getById(scheduledJob.getType());
if(scheduledJobType == null)
{
throw (new QException("Unrecognized type [" + scheduledJob.getType() + "] " + exceptionSuffix));
}
QSchedulerInterface scheduler = getScheduler(scheduledJob.getSchedulerName());
Map<String, Serializable> paramMap = new HashMap<>(scheduledJob.getJobParametersMap());
SchedulableType schedulableType = qInstance.getSchedulableType(scheduledJob.getType());
SchedulableRunner runner = QCodeLoader.getAdHoc(SchedulableRunner.class, schedulableType.getRunner());
runner.validateParams(schedulableIdentity, new HashMap<>(paramMap));
scheduler.setupSchedulable(schedulableIdentity, schedulableType, paramMap, scheduleMetaData, true);
}
@ -301,29 +352,34 @@ public class QScheduleManager
/*******************************************************************************
**
*******************************************************************************/
public void unscheduleScheduledJob(ScheduledJob scheduledJob)
public void unscheduleAll()
{
schedulers.values().forEach(s ->
{
try
{
s.unscheduleAll();
}
catch(Exception e)
{
LOG.warn("Error unscheduling everything in scheduler " + s, e);
}
});
}
/*******************************************************************************
**
*******************************************************************************/
public void unscheduleScheduledJob(ScheduledJob scheduledJob) throws QException
{
QSchedulerInterface scheduler = getScheduler(scheduledJob.getSchedulerName());
switch(ScheduledJobType.getById(scheduledJob.getType()))
{
case PROCESS ->
{
Map<String, String> paramMap = scheduledJob.getJobParametersMap();
String processName = paramMap.get("processName");
QProcessMetaData process = qInstance.getProcess(processName);
scheduler.unscheduleProcess(process);
}
case QUEUE_PROCESSOR ->
{
throw new NotImplementedException("ScheduledJob queue processors are not yet implemented...");
}
case TABLE_AUTOMATIONS ->
{
throw new NotImplementedException("ScheduledJob table automations are not yet implemented...");
}
default -> throw new IllegalStateException("Unexpected value: " + ScheduledJobType.getById(scheduledJob.getType()));
}
BasicSchedulableIdentity schedulableIdentity = SchedulableIdentityFactory.of(scheduledJob);
SchedulableType schedulableType = qInstance.getSchedulableType(scheduledJob.getType());
scheduler.unscheduleSchedulable(schedulableIdentity, schedulableType);
}
@ -331,28 +387,45 @@ public class QScheduleManager
/*******************************************************************************
**
*******************************************************************************/
private void setupProcess(QProcessMetaData process)
private void setupProcess(QProcessMetaData process) throws QException
{
QScheduleMetaData scheduleMetaData = process.getSchedule();
if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy()))
BasicSchedulableIdentity schedulableIdentity = SchedulableIdentityFactory.of(process);
QSchedulerInterface scheduler = getScheduler(process.getSchedule().getSchedulerName());
boolean allowedToStart = SchedulerUtils.allowedToStart(process.getName());
Map<String, String> paramMap = new HashMap<>();
paramMap.put("processName", process.getName());
SchedulableType schedulableType = qInstance.getSchedulableType(ScheduledJobType.PROCESS.getId());
if(process.getVariantBackend() == null || VariantRunStrategy.SERIAL.equals(process.getVariantRunStrategy()))
{
///////////////////////////////////////////////
// if no variants, or variant is serial mode //
///////////////////////////////////////////////
setupProcess(process, null);
scheduler.setupSchedulable(schedulableIdentity, schedulableType, new HashMap<>(paramMap), process.getSchedule(), allowedToStart);
}
else if(QScheduleMetaData.RunStrategy.PARALLEL.equals(process.getSchedule().getVariantRunStrategy()))
else if(VariantRunStrategy.PARALLEL.equals(process.getVariantRunStrategy()))
{
/////////////////////////////////////////////////////////////////////////////////////////////////////
// if this a "parallel", which for example means we want to have a thread for each backend variant //
// running at the same time, get the variant records and schedule each separately //
/////////////////////////////////////////////////////////////////////////////////////////////////////
QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend());
QBackendMetaData backendMetaData = qInstance.getBackend(process.getVariantBackend());
for(QRecord qRecord : CollectionUtils.nonNullList(SchedulerUtils.getBackendVariantFilteredRecords(process)))
{
try
{
setupProcess(process, MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField())));
HashMap<String, Serializable> parameters = new HashMap<>(paramMap);
HashMap<String, Serializable> variantMap = new HashMap<>(Map.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField())));
parameters.put("backendVariantData", variantMap);
String identity = schedulableIdentity.getIdentity() + ";" + backendMetaData.getVariantOptionsTableTypeValue() + "=" + qRecord.getValue(backendMetaData.getVariantOptionsTableIdField());
String description = schedulableIdentity.getDescription() + " for variant: " + backendMetaData.getVariantOptionsTableTypeValue() + "=" + qRecord.getValue(backendMetaData.getVariantOptionsTableIdField());
BasicSchedulableIdentity variantIdentity = new BasicSchedulableIdentity(identity, description);
scheduler.setupSchedulable(variantIdentity, schedulableType, parameters, process.getSchedule(), allowedToStart);
}
catch(Exception e)
{
@ -362,7 +435,7 @@ public class QScheduleManager
}
else
{
LOG.error("Unsupported Schedule Run Strategy [" + process.getSchedule().getVariantRunStrategy() + "] was provided.");
LOG.error("Unsupported Schedule Run Strategy [" + process.getVariantRunStrategy() + "] was provided.");
}
}
@ -371,71 +444,25 @@ public class QScheduleManager
/*******************************************************************************
**
*******************************************************************************/
private void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData)
private void setupTableAutomations(QTableMetaData table) throws QException
{
QSchedulerInterface scheduler = getScheduler(process.getSchedule().getSchedulerName());
scheduler.setupProcess(process, backendVariantData, process.getSchedule(), SchedulerUtils.allowedToStart(process));
}
SchedulableType schedulableType = qInstance.getSchedulableType(ScheduledJobType.TABLE_AUTOMATIONS.getId());
QTableAutomationDetails automationDetails = table.getAutomationDetails();
QSchedulerInterface scheduler = getScheduler(automationDetails.getSchedule().getSchedulerName());
List<PollingAutomationPerTableRunner.TableActionsInterface> tableActionList = PollingAutomationPerTableRunner.getTableActions(qInstance, automationDetails.getProviderName())
.stream().filter(ta -> ta.tableName().equals(table.getName()))
.toList();
/*******************************************************************************
**
*******************************************************************************/
private void setupQueueProvider(QQueueProviderMetaData queueProvider)
{
switch(queueProvider.getType())
{
case SQS:
setupSqsProvider((SQSQueueProviderMetaData) queueProvider);
break;
default:
throw new IllegalArgumentException("Unhandled queue provider type: " + queueProvider.getType());
}
}
/*******************************************************************************
**
*******************************************************************************/
private void setupSqsProvider(SQSQueueProviderMetaData queueProvider)
{
boolean allowedToStartProvider = SchedulerUtils.allowedToStart(queueProvider);
for(QQueueMetaData queue : qInstance.getQueues().values())
{
QSchedulerInterface scheduler = getScheduler(queue.getSchedule().getSchedulerName());
boolean allowedToStart = allowedToStartProvider && SchedulerUtils.allowedToStart(queue.getName());
if(queueProvider.getName().equals(queue.getProviderName()))
{
scheduler.setupSqsPoller(queueProvider, queue, queue.getSchedule(), allowedToStart);
}
}
}
/*******************************************************************************
**
*******************************************************************************/
private void setupAutomationProviderPerTable(QAutomationProviderMetaData automationProvider)
{
boolean allowedToStartProvider = SchedulerUtils.allowedToStart(automationProvider);
///////////////////////////////////////////////////////////////////////////////////
// ask the PollingAutomationPerTableRunner how many threads of itself need setup //
// then schedule each one of them. //
///////////////////////////////////////////////////////////////////////////////////
List<PollingAutomationPerTableRunner.TableActionsInterface> tableActionList = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName());
for(PollingAutomationPerTableRunner.TableActionsInterface tableActions : tableActionList)
{
boolean allowedToStart = allowedToStartProvider && SchedulerUtils.allowedToStart(tableActions.tableName());
SchedulableIdentity schedulableIdentity = SchedulableIdentityFactory.of(tableActions);
boolean allowedToStart = SchedulerUtils.allowedToStart(table.getName());
QScheduleMetaData schedule = tableActions.tableAutomationDetails().getSchedule();
QSchedulerInterface scheduler = getScheduler(schedule.getSchedulerName());
scheduler.setupTableAutomation(automationProvider, tableActions, schedule, allowedToStart);
Map<String, String> paramMap = new HashMap<>();
paramMap.put("tableName", tableActions.tableName());
paramMap.put("automationStatus", tableActions.status().name());
scheduler.setupSchedulable(schedulableIdentity, schedulableType, new HashMap<>(paramMap), automationDetails.getSchedule(), allowedToStart);
}
}
@ -444,12 +471,34 @@ public class QScheduleManager
/*******************************************************************************
**
*******************************************************************************/
private QSchedulerInterface getScheduler(String schedulerName)
private void setupQueue(QQueueMetaData queue) throws QException
{
SchedulableIdentity schedulableIdentity = SchedulableIdentityFactory.of(queue);
QSchedulerInterface scheduler = getScheduler(queue.getSchedule().getSchedulerName());
SchedulableType schedulableType = qInstance.getSchedulableType(ScheduledJobType.QUEUE_PROCESSOR.getId());
boolean allowedToStart = SchedulerUtils.allowedToStart(queue.getName());
Map<String, String> paramMap = new HashMap<>();
paramMap.put("queueName", queue.getName());
scheduler.setupSchedulable(schedulableIdentity, schedulableType, new HashMap<>(paramMap), queue.getSchedule(), allowedToStart);
}
/*******************************************************************************
**
*******************************************************************************/
private QSchedulerInterface getScheduler(String schedulerName) throws QException
{
if(!StringUtils.hasContent(schedulerName))
{
throw (new QException("Scheduler name was not given (and the concept of a default scheduler does not exist at this time)."));
}
QSchedulerInterface scheduler = schedulers.get(schedulerName);
if(scheduler == null)
{
throw new NotImplementedException("default scheduler...");
throw (new QException("Unrecognized schedulerName [" + schedulerName + "]"));
}
return (scheduler);

View File

@ -24,12 +24,10 @@ package com.kingsrook.qqq.backend.core.scheduler;
import java.io.Serializable;
import java.util.Map;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.model.metadata.automation.QAutomationProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentity;
/*******************************************************************************
@ -37,31 +35,41 @@ import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaDa
*******************************************************************************/
public interface QSchedulerInterface
{
/*******************************************************************************
**
*******************************************************************************/
void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, QScheduleMetaData schedule, boolean allowedToStart);
/*******************************************************************************
**
*******************************************************************************/
void setupSqsPoller(SQSQueueProviderMetaData queueProvider, QQueueMetaData queue, QScheduleMetaData schedule, boolean allowedToStart);
/*******************************************************************************
**
*******************************************************************************/
void setupTableAutomation(QAutomationProviderMetaData automationProvider, PollingAutomationPerTableRunner.TableActionsInterface tableActions, QScheduleMetaData schedule, boolean allowedToStart);
/*******************************************************************************
**
*******************************************************************************/
void unscheduleProcess(QProcessMetaData process);
String getSchedulerName();
/*******************************************************************************
**
*******************************************************************************/
void start();
/*******************************************************************************
** called to indicate that the schedule manager is past its startup routine,
** but that the schedule should not actually be running in this process.
*******************************************************************************/
default void doNotStart()
{
}
/*******************************************************************************
**
*******************************************************************************/
void setupSchedulable(SchedulableIdentity schedulableIdentity, SchedulableType schedulableType, Map<String, Serializable> parameters, QScheduleMetaData schedule, boolean allowedToStart);
/*******************************************************************************
**
*******************************************************************************/
void unscheduleSchedulable(SchedulableIdentity schedulableIdentity, SchedulableType schedulableType);
/*******************************************************************************
**
*******************************************************************************/
void unscheduleAll() throws QException;
/*******************************************************************************
**
*******************************************************************************/
@ -77,7 +85,9 @@ public interface QSchedulerInterface
*******************************************************************************/
default void unInit()
{
/////////////////////
// noop by default //
/////////////////////
}
/*******************************************************************************
@ -85,7 +95,9 @@ public interface QSchedulerInterface
*******************************************************************************/
default void startOfSetupSchedules()
{
/////////////////////
// noop by default //
/////////////////////
}
/*******************************************************************************
@ -93,6 +105,9 @@ public interface QSchedulerInterface
*******************************************************************************/
default void endOfSetupSchedules()
{
/////////////////////
// noop by default //
/////////////////////
}
}

View File

@ -41,10 +41,10 @@ import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.TopLevelMetaDataInterface;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.VariantRunStrategy;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import com.kingsrook.qqq.backend.core.utils.collections.MapBuilder;
@ -56,17 +56,6 @@ public class SchedulerUtils
private static final QLogger LOG = QLogger.getLogger(SchedulerUtils.class);
/*******************************************************************************
**
*******************************************************************************/
public static boolean allowedToStart(TopLevelMetaDataInterface metaDataObject)
{
return (allowedToStart(metaDataObject.getName()));
}
/*******************************************************************************
**
*******************************************************************************/
@ -87,7 +76,7 @@ public class SchedulerUtils
/*******************************************************************************
**
*******************************************************************************/
public static void runProcess(QInstance qInstance, Supplier<QSession> sessionSupplier, QProcessMetaData process, Map<String, Serializable> backendVariantData)
public static void runProcess(QInstance qInstance, Supplier<QSession> sessionSupplier, QProcessMetaData process, Map<String, Serializable> backendVariantData, Map<String, Serializable> processInputValues)
{
String originalThreadName = Thread.currentThread().getName();
@ -95,11 +84,11 @@ public class SchedulerUtils
{
QContext.init(qInstance, sessionSupplier.get());
if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.PARALLEL.equals(process.getSchedule().getVariantRunStrategy()))
if(process.getVariantBackend() == null || VariantRunStrategy.PARALLEL.equals(process.getVariantRunStrategy()))
{
SchedulerUtils.executeSingleProcess(process, backendVariantData);
executeSingleProcess(process, backendVariantData, processInputValues);
}
else if(QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy()))
else if(VariantRunStrategy.SERIAL.equals(process.getVariantRunStrategy()))
{
///////////////////////////////////////////////////////////////////////////////////////////////////
// if this is "serial", which for example means we want to run each backend variant one after //
@ -109,9 +98,9 @@ public class SchedulerUtils
{
try
{
QScheduleMetaData scheduleMetaData = process.getSchedule();
QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend());
executeSingleProcess(process, MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField())));
QBackendMetaData backendMetaData = qInstance.getBackend(process.getVariantBackend());
Map<String, Serializable> thisVariantData = MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField()));
executeSingleProcess(process, thisVariantData, processInputValues);
}
catch(Exception e)
{
@ -136,7 +125,7 @@ public class SchedulerUtils
/*******************************************************************************
**
*******************************************************************************/
private static void executeSingleProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData) throws QException
private static void executeSingleProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, Map<String, Serializable> processInputValues) throws QException
{
if(backendVariantData != null)
{
@ -144,10 +133,16 @@ public class SchedulerUtils
}
Thread.currentThread().setName("ScheduledProcess>" + process.getName());
LOG.debug("Running Scheduled Process [" + process.getName() + "]");
LOG.debug("Running Scheduled Process [" + process.getName() + "] with values [" + processInputValues + "]");
RunProcessInput runProcessInput = new RunProcessInput();
runProcessInput.setProcessName(process.getName());
for(Map.Entry<String, Serializable> entry : CollectionUtils.nonNullMap(processInputValues).entrySet())
{
runProcessInput.withValue(entry.getKey(), entry.getValue());
}
runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
QContext.pushAction(runProcessInput);
@ -166,8 +161,7 @@ public class SchedulerUtils
List<QRecord> records = null;
try
{
QScheduleMetaData scheduleMetaData = processMetaData.getSchedule();
QBackendMetaData backendMetaData = QContext.getQInstance().getBackend(scheduleMetaData.getVariantBackend());
QBackendMetaData backendMetaData = QContext.getQInstance().getBackend(processMetaData.getVariantBackend());
QueryInput queryInput = new QueryInput();
queryInput.setTableName(backendMetaData.getVariantOptionsTableName());

View File

@ -0,0 +1,90 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2023. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.processes;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.MetaDataProducerInterface;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.dashboard.nocode.WidgetHtmlLine;
import com.kingsrook.qqq.backend.core.model.metadata.layout.QIcon;
import com.kingsrook.qqq.backend.core.model.metadata.processes.NoCodeWidgetFrontendComponentMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFrontendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.scheduler.QScheduleManager;
/*******************************************************************************
** Management process to reschedule all scheduled jobs (in all schedulers).
*******************************************************************************/
public class RescheduleAllJobsProcess implements BackendStep, MetaDataProducerInterface<QProcessMetaData>
{
/*******************************************************************************
**
*******************************************************************************/
@Override
public QProcessMetaData produce(QInstance qInstance) throws QException
{
return new QProcessMetaData()
.withName(getClass().getSimpleName())
.withLabel("Reschedule all Scheduled Jobs")
.withIcon(new QIcon("update"))
.withStepList(List.of(
new QFrontendStepMetaData()
.withName("confirm")
.withComponent(new NoCodeWidgetFrontendComponentMetaData()
.withOutput(new WidgetHtmlLine().withVelocityTemplate("Please confirm you wish to reschedule all jobs."))),
new QBackendStepMetaData()
.withName("execute")
.withCode(new QCodeReference(getClass())),
new QFrontendStepMetaData()
.withName("results")
.withComponent(new NoCodeWidgetFrontendComponentMetaData()
.withOutput(new WidgetHtmlLine().withVelocityTemplate("All jobs have been rescheduled.")))));
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
try
{
QScheduleManager.getInstance().setupAllSchedules();
}
catch(Exception e)
{
throw (new QException("Error setting up all scheduled jobs.", e));
}
}
}

View File

@ -0,0 +1,90 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2023. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.processes;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.MetaDataProducerInterface;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.dashboard.nocode.WidgetHtmlLine;
import com.kingsrook.qqq.backend.core.model.metadata.layout.QIcon;
import com.kingsrook.qqq.backend.core.model.metadata.processes.NoCodeWidgetFrontendComponentMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFrontendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.scheduler.QScheduleManager;
/*******************************************************************************
** Management process to unschedule all scheduled jobs (in all schedulers).
*******************************************************************************/
public class UnscheduleAllJobsProcess implements BackendStep, MetaDataProducerInterface<QProcessMetaData>
{
/*******************************************************************************
**
*******************************************************************************/
@Override
public QProcessMetaData produce(QInstance qInstance) throws QException
{
return new QProcessMetaData()
.withName(getClass().getSimpleName())
.withLabel("Unschedule all Scheduled Jobs")
.withIcon(new QIcon("update_disabled"))
.withStepList(List.of(
new QFrontendStepMetaData()
.withName("confirm")
.withComponent(new NoCodeWidgetFrontendComponentMetaData()
.withOutput(new WidgetHtmlLine().withVelocityTemplate("Please confirm you wish to unschedule all jobs."))),
new QBackendStepMetaData()
.withName("execute")
.withCode(new QCodeReference(getClass())),
new QFrontendStepMetaData()
.withName("results")
.withComponent(new NoCodeWidgetFrontendComponentMetaData()
.withOutput(new WidgetHtmlLine().withVelocityTemplate("All jobs have been unscheduled.")))));
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
try
{
QScheduleManager.getInstance().unscheduleAll();
}
catch(Exception e)
{
throw (new QException("Error unscheduling all scheduled jobs.", e));
}
}
}

View File

@ -22,14 +22,15 @@
package com.kingsrook.qqq.backend.core.scheduler.quartz;
import com.kingsrook.qqq.backend.core.actions.queues.SQSQueuePoller;
import java.util.Map;
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
import com.kingsrook.qqq.backend.core.context.CapturedContext;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import org.quartz.DisallowConcurrentExecution;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableRunner;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
@ -38,10 +39,9 @@ import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
**
*******************************************************************************/
@DisallowConcurrentExecution
public class QuartzSqsPollerJob implements Job
public class QuartzJobRunner implements Job
{
private static final QLogger LOG = QLogger.getLogger(QuartzSqsPollerJob.class);
private static final QLogger LOG = QLogger.getLogger(QuartzJobRunner.class);
@ -49,37 +49,28 @@ public class QuartzSqsPollerJob implements Job
**
*******************************************************************************/
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException
public void execute(JobExecutionContext context) throws JobExecutionException
{
String queueProviderName = null;
String queueName = null;
CapturedContext capturedContext = QContext.capture();
try
{
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
queueProviderName = jobDataMap.getString("queueProviderName");
queueName = jobDataMap.getString("queueName");
QInstance qInstance = QuartzScheduler.getInstance().getQInstance();
QuartzScheduler quartzScheduler = QuartzScheduler.getInstance();
QInstance qInstance = quartzScheduler.getQInstance();
QContext.init(qInstance, quartzScheduler.getSessionSupplier().get());
SQSQueuePoller sqsQueuePoller = new SQSQueuePoller();
sqsQueuePoller.setQueueProviderMetaData((SQSQueueProviderMetaData) qInstance.getQueueProvider(queueProviderName));
sqsQueuePoller.setQueueMetaData(qInstance.getQueue(queueName));
sqsQueuePoller.setQInstance(qInstance);
sqsQueuePoller.setSessionSupplier(QuartzScheduler.getInstance().getSessionSupplier());
SchedulableType schedulableType = qInstance.getSchedulableType(context.getJobDetail().getJobDataMap().getString("type"));
Map<String, Object> params = (Map<String, Object>) context.getJobDetail().getJobDataMap().get("params");
/////////////
// run it. //
/////////////
LOG.debug("Running quartz SQS Poller", logPair("queueName", queueName), logPair("queueProviderName", queueProviderName));
sqsQueuePoller.run();
SchedulableRunner schedulableRunner = QCodeLoader.getAdHoc(SchedulableRunner.class, schedulableType.getRunner());
schedulableRunner.run(params);
}
catch(Exception e)
{
LOG.warn("Error running SQS Poller", e, logPair("queueName", queueName), logPair("queueProviderName", queueProviderName));
LOG.warn("Error running QuartzJob", e, logPair("jobContext", context));
}
finally
{
QContext.clear();
QContext.init(capturedContext);
}
}

View File

@ -1,93 +0,0 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2023. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.quartz;
import java.io.Serializable;
import java.util.Map;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.scheduler.SchedulerUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
**
*******************************************************************************/
@DisallowConcurrentExecution
public class QuartzRunProcessJob implements Job
{
private static final QLogger LOG = QLogger.getLogger(QuartzRunProcessJob.class);
/*******************************************************************************
**
*******************************************************************************/
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException
{
try
{
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
String processName = jobDataMap.getString("processName");
///////////////////////////////////////
// get the process from the instance //
///////////////////////////////////////
QInstance qInstance = QuartzScheduler.getInstance().getQInstance();
QProcessMetaData process = qInstance.getProcess(processName);
if(process == null)
{
LOG.warn("Could not find scheduled process in QInstance", logPair("processName", processName));
return;
}
///////////////////////////////////////////////
// if the job has variant data, get it ready //
///////////////////////////////////////////////
Map<String, Serializable> backendVariantData = null;
if(jobExecutionContext.getMergedJobDataMap().containsKey("backendVariantData"))
{
backendVariantData = (Map<String, Serializable>) jobExecutionContext.getMergedJobDataMap().get("backendVariantData");
}
/////////////
// run it. //
/////////////
LOG.debug("Running quartz process", logPair("processName", processName));
SchedulerUtils.runProcess(qInstance, QuartzScheduler.getInstance().getSessionSupplier(), qInstance.getProcess(processName), backendVariantData);
}
finally
{
QContext.clear();
}
}
}

View File

@ -37,16 +37,15 @@ import java.util.Set;
import java.util.TimeZone;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.automation.QAutomationProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.scheduler.QSchedulerInterface;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentity;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
import com.kingsrook.qqq.backend.core.utils.memoization.AnyKey;
import com.kingsrook.qqq.backend.core.utils.memoization.Memoization;
import org.quartz.CronExpression;
@ -78,15 +77,10 @@ public class QuartzScheduler implements QSchedulerInterface
private final QInstance qInstance;
private String schedulerName;
private Properties quartzProperties;
private Supplier<QSession> sessionSupplier;
private Scheduler scheduler;
private static final String GROUP_NAME_PROCESSES = "processes";
private static final String GROUP_NAME_SQS_QUEUES = "sqsQueues";
private static final String GROUP_NAME_TABLE_AUTOMATIONS = "tableAutomations";
/////////////////////////////////////////////////////////////////////////////////////////
// create memoization objects for some quartz-query functions, that we'll only want to //
@ -112,17 +106,24 @@ public class QuartzScheduler implements QSchedulerInterface
private List<QuartzJobAndTriggerWrapper> scheduledJobsAtStartOfSetup = new ArrayList<>();
private List<QuartzJobAndTriggerWrapper> scheduledJobsAtEndOfSetup = new ArrayList<>();
/////////////////////////////////////////////////////////////////////////////////
// track if the instance is past the server's startup routine. //
// for quartz - we'll use this to know if we're allowed to schedule jobs. //
// that is - during server startup, we don't want to the schedule & unschedule //
// routine, which could potentially have serve concurrency problems //
/////////////////////////////////////////////////////////////////////////////////
private boolean pastStartup = false;
/*******************************************************************************
** Constructor
**
*******************************************************************************/
private QuartzScheduler(QInstance qInstance, String schedulerName, Properties quartzProperties, Supplier<QSession> sessionSupplier)
private QuartzScheduler(QInstance qInstance, String schedulerName, Supplier<QSession> sessionSupplier)
{
this.qInstance = qInstance;
this.schedulerName = schedulerName;
this.quartzProperties = quartzProperties;
this.sessionSupplier = sessionSupplier;
}
@ -136,7 +137,7 @@ public class QuartzScheduler implements QSchedulerInterface
{
if(quartzScheduler == null)
{
quartzScheduler = new QuartzScheduler(qInstance, schedulerName, quartzProperties, sessionSupplier);
quartzScheduler = new QuartzScheduler(qInstance, schedulerName, sessionSupplier);
///////////////////////////////////////////////////////////
// Grab the Scheduler instance from the Factory //
@ -165,11 +166,24 @@ public class QuartzScheduler implements QSchedulerInterface
/*******************************************************************************
**
*******************************************************************************/
@Override
public String getSchedulerName()
{
return (schedulerName);
}
/*******************************************************************************
**
*******************************************************************************/
public void start()
{
this.pastStartup = true;
try
{
//////////////////////
@ -185,6 +199,17 @@ public class QuartzScheduler implements QSchedulerInterface
/*******************************************************************************
**
*******************************************************************************/
@Override
public void doNotStart()
{
this.pastStartup = true;
}
/*******************************************************************************
**
*******************************************************************************/
@ -225,20 +250,21 @@ public class QuartzScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, QScheduleMetaData schedule, boolean allowedToStart)
public void setupSchedulable(SchedulableIdentity schedulableIdentity, SchedulableType schedulableType, Map<String, Serializable> parameters, QScheduleMetaData schedule, boolean allowedToStart)
{
/////////////////////////
// set up job data map //
/////////////////////////
Map<String, Object> jobData = new HashMap<>();
jobData.put("processName", process.getName());
if(backendVariantData != null)
////////////////////////////////////////////////////////////////////////////
// only actually schedule things if we're past the server startup routine //
////////////////////////////////////////////////////////////////////////////
if(!pastStartup)
{
jobData.put("backendVariantData", backendVariantData);
return;
}
scheduleJob(process.getName(), GROUP_NAME_PROCESSES, QuartzRunProcessJob.class, jobData, schedule, allowedToStart);
Map<String, Object> jobData = new HashMap<>();
jobData.put("params", parameters);
jobData.put("type", schedulableType.getName());
scheduleJob(schedulableIdentity, schedulableType.getName(), QuartzJobRunner.class, jobData, schedule, allowedToStart);
}
@ -249,12 +275,21 @@ public class QuartzScheduler implements QSchedulerInterface
@Override
public void startOfSetupSchedules()
{
////////////////////////////////////////////////////////////////////////////
// only actually schedule things if we're past the server startup routine //
////////////////////////////////////////////////////////////////////////////
if(!pastStartup)
{
return;
}
this.insideSetup = true;
this.allMemoizations.forEach(m -> m.setTimeout(Duration.ofSeconds(5)));
try
{
this.scheduledJobsAtStartOfSetup = queryQuartz();
this.scheduledJobsAtEndOfSetup = new ArrayList<>();
}
catch(Exception e)
{
@ -270,6 +305,14 @@ public class QuartzScheduler implements QSchedulerInterface
@Override
public void endOfSetupSchedules()
{
////////////////////////////////////////////////////////////////////////////
// only actually schedule things if we're past the server startup routine //
////////////////////////////////////////////////////////////////////////////
if(!pastStartup)
{
return;
}
this.insideSetup = false;
this.allMemoizations.forEach(m -> m.setTimeout(Duration.ofSeconds(0)));
@ -281,7 +324,7 @@ public class QuartzScheduler implements QSchedulerInterface
try
{
Set<JobKey> startJobKeys = this.scheduledJobsAtStartOfSetup.stream().map(w -> w.jobDetail().getKey()).collect(Collectors.toSet());
Set<JobKey> endJobKeys = scheduledJobsAtEndOfSetup.stream().map(w -> w.jobDetail().getKey()).collect(Collectors.toSet());
Set<JobKey> endJobKeys = this.scheduledJobsAtEndOfSetup.stream().map(w -> w.jobDetail().getKey()).collect(Collectors.toSet());
/////////////////////////////////////////////////////////////////////////////////////////////////////
// remove all 'end' keys from the set of start keys. any left-over start-keys need to be deleted. //
@ -310,18 +353,19 @@ public class QuartzScheduler implements QSchedulerInterface
/*******************************************************************************
**
*******************************************************************************/
private boolean scheduleJob(String jobName, String groupName, Class<? extends Job> jobClass, Map<String, Object> jobData, QScheduleMetaData scheduleMetaData, boolean allowedToStart)
private boolean scheduleJob(SchedulableIdentity schedulableIdentity, String groupName, Class<? extends Job> jobClass, Map<String, Object> jobData, QScheduleMetaData scheduleMetaData, boolean allowedToStart)
{
try
{
/////////////////////////
// Define job instance //
/////////////////////////
JobKey jobKey = new JobKey(jobName, groupName);
JobKey jobKey = new JobKey(schedulableIdentity.getIdentity(), groupName);
JobDetail jobDetail = JobBuilder.newJob(jobClass)
.withIdentity(jobKey)
.withDescription(schedulableIdentity.getDescription())
.storeDurably()
.requestRecovery()
.requestRecovery() // todo - our frequent repeaters, maybe nice to say false here
.build();
jobDetail.getJobDataMap().putAll(jobData);
@ -366,10 +410,11 @@ public class QuartzScheduler implements QSchedulerInterface
// Define a Trigger for the schedule //
///////////////////////////////////////
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(new TriggerKey(jobName, groupName))
.withIdentity(new TriggerKey(schedulableIdentity.getIdentity(), groupName))
.withDescription(schedulableIdentity.getDescription() + " - " + getScheduleDescriptionForTrigger(scheduleMetaData))
.forJob(jobKey)
.withSchedule(scheduleBuilder)
.startAt(startAt)
// .startAt(startAt)
.build();
///////////////////////////////////////
@ -390,7 +435,7 @@ public class QuartzScheduler implements QSchedulerInterface
}
catch(Exception e)
{
LOG.warn("Error scheduling job", e, logPair("name", jobName), logPair("group", groupName));
LOG.warn("Error scheduling job", e, logPair("name", schedulableIdentity.getIdentity()), logPair("group", groupName));
return (false);
}
}
@ -400,17 +445,24 @@ public class QuartzScheduler implements QSchedulerInterface
/*******************************************************************************
**
*******************************************************************************/
@Override
public void setupSqsPoller(SQSQueueProviderMetaData queueProvider, QQueueMetaData queue, QScheduleMetaData schedule, boolean allowedToStart)
private String getScheduleDescriptionForTrigger(QScheduleMetaData scheduleMetaData)
{
/////////////////////////
// set up job data map //
/////////////////////////
Map<String, Object> jobData = new HashMap<>();
jobData.put("queueProviderName", queueProvider.getName());
jobData.put("queueName", queue.getName());
if(StringUtils.hasContent(scheduleMetaData.getDescription()))
{
return scheduleMetaData.getDescription();
}
scheduleJob(queue.getName(), GROUP_NAME_SQS_QUEUES, QuartzSqsPollerJob.class, jobData, schedule, allowedToStart);
if(StringUtils.hasContent(scheduleMetaData.getCronExpression()))
{
return "cron expression: " + scheduleMetaData.getCronExpression() + (StringUtils.hasContent(scheduleMetaData.getCronTimeZoneId()) ? " time zone: " + scheduleMetaData.getCronTimeZoneId() : "");
}
if(scheduleMetaData.getRepeatSeconds() != null)
{
return "repeat seconds: " + scheduleMetaData.getRepeatSeconds();
}
return "";
}
@ -419,17 +471,17 @@ public class QuartzScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void setupTableAutomation(QAutomationProviderMetaData automationProvider, PollingAutomationPerTableRunner.TableActionsInterface tableActions, QScheduleMetaData schedule, boolean allowedToStart)
public void unscheduleSchedulable(SchedulableIdentity schedulableIdentity, SchedulableType schedulableType)
{
/////////////////////////
// set up job data map //
/////////////////////////
Map<String, Object> jobData = new HashMap<>();
jobData.put("automationProviderName", automationProvider.getName());
jobData.put("tableName", tableActions.tableName());
jobData.put("automationStatus", tableActions.status().toString());
////////////////////////////////////////////////////////////////////////////
// only actually schedule things if we're past the server startup routine //
////////////////////////////////////////////////////////////////////////////
if(!pastStartup)
{
return;
}
scheduleJob(tableActions.tableName() + "." + tableActions.status(), GROUP_NAME_TABLE_AUTOMATIONS, QuartzTableAutomationsJob.class, jobData, schedule, allowedToStart);
deleteJob(new JobKey(schedulableIdentity.getIdentity(), schedulableType.getName()));
}
@ -438,9 +490,19 @@ public class QuartzScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void unscheduleProcess(QProcessMetaData process)
public void unscheduleAll() throws QException
{
deleteJob(new JobKey(process.getName(), GROUP_NAME_PROCESSES));
try
{
for(QuartzJobAndTriggerWrapper wrapper : queryQuartz())
{
deleteJob(new JobKey(wrapper.jobDetail().getKey().getName(), wrapper.jobDetail().getKey().getGroup()));
}
}
catch(Exception e)
{
throw (new QException("Error unscheduling all quartz jobs", e));
}
}
@ -455,9 +517,9 @@ public class QuartzScheduler implements QSchedulerInterface
{
boolean wasPaused = wasExistingJobPaused(jobKey);
this.scheduler.addJob(jobDetail, true); // note, true flag here replaces if already present.
this.scheduler.rescheduleJob(trigger.getKey(), trigger);
this.scheduler.scheduleJob(jobDetail, Set.of(trigger), true); // note, true flag here replaces if already present.
LOG.info("Re-scheduled job", logPair("jobKey", jobKey));
if(wasPaused)
{
LOG.info("Re-pausing job", logPair("jobKey", jobKey));
@ -488,7 +550,7 @@ public class QuartzScheduler implements QSchedulerInterface
}
}
return(false);
return (false);
}
@ -583,7 +645,20 @@ public class QuartzScheduler implements QSchedulerInterface
*******************************************************************************/
public void pauseAll() throws SchedulerException
{
this.scheduler.pauseAll();
///////////////////////////////////////////////////////////////////////////////
// lesson from past self to future self: //
// pauseAll creates paused-group entries for all jobs - //
// and so they can only really be resumed by a resumeAll call... //
// even newly scheduled things become paused. Which can be quite confusing. //
// so, we don't want pause all. //
///////////////////////////////////////////////////////////////////////////////
// this.scheduler.pauseAll();
List<QuartzJobAndTriggerWrapper> quartzJobAndTriggerWrappers = queryQuartz();
for(QuartzJobAndTriggerWrapper wrapper : quartzJobAndTriggerWrappers)
{
this.pauseJob(wrapper.jobDetail().getKey().getName(), wrapper.jobDetail().getKey().getGroup());
}
}
@ -593,6 +668,9 @@ public class QuartzScheduler implements QSchedulerInterface
*******************************************************************************/
public void resumeAll() throws SchedulerException
{
//////////////////////////////////////////////////
// this seems okay, even though pauseAll isn't. //
//////////////////////////////////////////////////
this.scheduler.resumeAll();
}
@ -603,6 +681,7 @@ public class QuartzScheduler implements QSchedulerInterface
*******************************************************************************/
public void pauseJob(String jobName, String groupName) throws SchedulerException
{
LOG.info("Request to pause job", logPair("jobName", jobName));
this.scheduler.pauseJob(new JobKey(jobName, groupName));
}
@ -613,6 +692,7 @@ public class QuartzScheduler implements QSchedulerInterface
*******************************************************************************/
public void resumeJob(String jobName, String groupName) throws SchedulerException
{
LOG.info("Request to resume job", logPair("jobName", jobName));
this.scheduler.resumeJob(new JobKey(jobName, groupName));
}

View File

@ -1,104 +0,0 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.quartz;
import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
**
*******************************************************************************/
@DisallowConcurrentExecution
public class QuartzTableAutomationsJob implements Job
{
private static final QLogger LOG = QLogger.getLogger(QuartzTableAutomationsJob.class);
/*******************************************************************************
**
*******************************************************************************/
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException
{
String tableName = null;
String automationProviderName = null;
AutomationStatus automationStatus = null;
try
{
JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
tableName = jobDataMap.getString("tableName");
automationProviderName = jobDataMap.getString("automationProviderName");
automationStatus = AutomationStatus.valueOf(jobDataMap.getString("automationStatus"));
QInstance qInstance = QuartzScheduler.getInstance().getQInstance();
QTableMetaData table = qInstance.getTable(tableName);
if(table == null)
{
LOG.warn("Could not find table for automations in QInstance", logPair("tableName", tableName));
return;
}
QTableAutomationDetails automationDetails = table.getAutomationDetails();
if(automationDetails == null)
{
LOG.warn("Could not find automationDetails for table for automations in QInstance", logPair("tableName", tableName));
return;
}
///////////////////////////////////
// todo - sharded automations... //
///////////////////////////////////
PollingAutomationPerTableRunner.TableActionsInterface tableAction = new PollingAutomationPerTableRunner.TableActions(tableName, automationDetails, automationStatus);
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProviderName, QuartzScheduler.getInstance().getSessionSupplier(), tableAction);
/////////////
// run it. //
/////////////
LOG.debug("Running Table Automations", logPair("tableName", tableName), logPair("automationStatus", automationStatus));
runner.run();
}
catch(Exception e)
{
LOG.warn("Error running Table Automations", e, logPair("tableName", tableName), logPair("automationStatus", automationStatus));
}
finally
{
QContext.clear();
}
}
}

View File

@ -40,7 +40,7 @@ import com.kingsrook.qqq.backend.core.scheduler.quartz.QuartzScheduler;
/*******************************************************************************
**
** Manage process to pause all quartz jobs
*******************************************************************************/
public class PauseAllQuartzJobsProcess implements BackendStep, MetaDataProducerInterface<QProcessMetaData>
{
@ -55,6 +55,10 @@ public class PauseAllQuartzJobsProcess implements BackendStep, MetaDataProducerI
.withName(getClass().getSimpleName())
.withLabel("Pause All Quartz Jobs")
.withStepList(List.of(
new QFrontendStepMetaData()
.withName("confirm")
.withComponent(new NoCodeWidgetFrontendComponentMetaData()
.withOutput(new WidgetHtmlLine().withVelocityTemplate("Please confirm you wish to pause all quartz jobs."))),
new QBackendStepMetaData()
.withName("execute")
.withCode(new QCodeReference(getClass())),

View File

@ -40,7 +40,7 @@ import com.kingsrook.qqq.backend.core.scheduler.quartz.QuartzScheduler;
/*******************************************************************************
**
** Manage process to resume all quartz jobs
*******************************************************************************/
public class ResumeAllQuartzJobsProcess implements BackendStep, MetaDataProducerInterface<QProcessMetaData>
{
@ -55,6 +55,10 @@ public class ResumeAllQuartzJobsProcess implements BackendStep, MetaDataProducer
.withName(getClass().getSimpleName())
.withLabel("Resume All Quartz Jobs")
.withStepList(List.of(
new QFrontendStepMetaData()
.withName("confirm")
.withComponent(new NoCodeWidgetFrontendComponentMetaData()
.withOutput(new WidgetHtmlLine().withVelocityTemplate("Please confirm you wish to resume all quartz jobs."))),
new QBackendStepMetaData()
.withName("execute")
.withCode(new QCodeReference(getClass())),

View File

@ -0,0 +1,98 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.schedulable;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
/*******************************************************************************
**
*******************************************************************************/
public class SchedulableType
{
private String name;
private QCodeReference runner;
/*******************************************************************************
** Getter for name
*******************************************************************************/
public String getName()
{
return (this.name);
}
/*******************************************************************************
** Setter for name
*******************************************************************************/
public void setName(String name)
{
this.name = name;
}
/*******************************************************************************
** Fluent setter for name
*******************************************************************************/
public SchedulableType withName(String name)
{
this.name = name;
return (this);
}
/*******************************************************************************
** Getter for runner
*******************************************************************************/
public QCodeReference getRunner()
{
return (this.runner);
}
/*******************************************************************************
** Setter for runner
*******************************************************************************/
public void setRunner(QCodeReference runner)
{
this.runner = runner;
}
/*******************************************************************************
** Fluent setter for runner
*******************************************************************************/
public SchedulableType withRunner(QCodeReference runner)
{
this.runner = runner;
return (this);
}
}

View File

@ -0,0 +1,121 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.schedulable.identity;
import java.util.Objects;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
/*******************************************************************************
** Basic implementation of interface for identifying schedulable things
*******************************************************************************/
public class BasicSchedulableIdentity implements SchedulableIdentity
{
private String identity;
private String description;
/*******************************************************************************
** Constructor
**
*******************************************************************************/
public BasicSchedulableIdentity(String identity, String description)
{
if(!StringUtils.hasContent(identity))
{
throw (new IllegalArgumentException("Identity may not be null or empty."));
}
this.identity = identity;
this.description = description;
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public boolean equals(Object o)
{
if(this == o)
{
return true;
}
if(o == null || getClass() != o.getClass())
{
return false;
}
BasicSchedulableIdentity that = (BasicSchedulableIdentity) o;
return Objects.equals(identity, that.identity);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public int hashCode()
{
return Objects.hash(identity);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public String getIdentity()
{
return identity;
}
/*******************************************************************************
** Getter for description
**
*******************************************************************************/
@Override
public String getDescription()
{
return description;
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public String toString()
{
return getIdentity();
}
}

View File

@ -0,0 +1,55 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.schedulable.identity;
/*******************************************************************************
** Unique identifier for a thing that can be scheduled
*******************************************************************************/
public interface SchedulableIdentity
{
/*******************************************************************************
**
*******************************************************************************/
@Override
boolean equals(Object that);
/*******************************************************************************
**
*******************************************************************************/
@Override
int hashCode();
/*******************************************************************************
**
*******************************************************************************/
String getIdentity();
/*******************************************************************************
** should NOT be part of equals & has code
*******************************************************************************/
String getDescription();
}

View File

@ -0,0 +1,96 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.schedulable.identity;
import java.util.HashMap;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJob;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJobType;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableRunner;
/*******************************************************************************
** Factory to produce SchedulableIdentity objects
*******************************************************************************/
public class SchedulableIdentityFactory
{
/*******************************************************************************
** Factory to create one of these for a scheduled job record
*******************************************************************************/
public static BasicSchedulableIdentity of(ScheduledJob scheduledJob)
{
String description = "";
ScheduledJobType scheduledJobType = ScheduledJobType.getById(scheduledJob.getType());
if(scheduledJobType != null)
{
try
{
SchedulableType schedulableType = QContext.getQInstance().getSchedulableType(scheduledJob.getType());
SchedulableRunner runner = QCodeLoader.getAdHoc(SchedulableRunner.class, schedulableType.getRunner());
description = runner.getDescription(new HashMap<>(scheduledJob.getJobParametersMap()));
}
catch(Exception e)
{
description = "type: " + scheduledJobType;
}
}
return new BasicSchedulableIdentity("scheduledJob:" + scheduledJob.getId(), description);
}
/*******************************************************************************
**
*******************************************************************************/
public static BasicSchedulableIdentity of(QProcessMetaData process)
{
return new BasicSchedulableIdentity("process:" + process.getName(), "Process: " + process.getName());
}
/*******************************************************************************
**
*******************************************************************************/
public static SchedulableIdentity of(QQueueMetaData queue)
{
return new BasicSchedulableIdentity("queue:" + queue.getName(), "Queue: " + queue.getName());
}
/*******************************************************************************
**
*******************************************************************************/
public static SchedulableIdentity of(PollingAutomationPerTableRunner.TableActionsInterface tableActions)
{
return new BasicSchedulableIdentity("tableAutomations:" + tableActions.tableName() + "." + tableActions.status(), "TableAutomations: " + tableActions.tableName() + "." + tableActions.status());
}
}

View File

@ -0,0 +1,188 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.schedulable.runner;
import java.io.Serializable;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.scheduler.SchedulerUtils;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentity;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
** Schedulable process runner - e.g., how a QProcess is run by a scheduler.
*******************************************************************************/
public class SchedulableProcessRunner implements SchedulableRunner
{
private static final QLogger LOG = QLogger.getLogger(SchedulableProcessRunner.class);
/*******************************************************************************
**
*******************************************************************************/
@Override
public void run(Map<String, Object> params)
{
String processName = ValueUtils.getValueAsString(params.get("processName"));
///////////////////////////////////////
// get the process from the instance //
///////////////////////////////////////
QInstance qInstance = QContext.getQInstance();
QProcessMetaData process = qInstance.getProcess(processName);
if(process == null)
{
LOG.warn("Could not find scheduled process in QInstance", logPair("processName", processName));
return;
}
///////////////////////////////////////////////
// if the job has variant data, get it ready //
///////////////////////////////////////////////
Map<String, Serializable> backendVariantData = null;
if(params.containsKey("backendVariantData"))
{
backendVariantData = (Map<String, Serializable>) params.get("backendVariantData");
}
Map<String, Serializable> processInputValues = buildProcessInputValuesMap(params, process);
/////////////
// run it. //
/////////////
LOG.debug("Running scheduled process", logPair("processName", processName));
SchedulerUtils.runProcess(qInstance, () -> QContext.getQSession(), qInstance.getProcess(processName), backendVariantData, processInputValues);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void validateParams(SchedulableIdentity schedulableIdentity, Map<String, Object> paramMap) throws QException
{
String processName = ValueUtils.getValueAsString(paramMap.get("processName"));
if(!StringUtils.hasContent(processName))
{
throw (new QException("Missing scheduledJobParameter with key [processName] in " + schedulableIdentity));
}
QProcessMetaData process = QContext.getQInstance().getProcess(processName);
if(process == null)
{
throw (new QException("Unrecognized processName [" + processName + "] in " + schedulableIdentity));
}
if(process.getSchedule() != null)
{
throw (new QException("Process [" + processName + "] has a schedule in its metaData - so it should not be dynamically scheduled via a scheduled job! " + schedulableIdentity));
}
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public String getDescription(Map<String, Object> params)
{
return "Process: " + params.get("processName");
}
/*******************************************************************************
**
*******************************************************************************/
private static Map<String, Serializable> buildProcessInputValuesMap(Map<String, Object> params, QProcessMetaData process)
{
Map<String, Serializable> processInputValues = new HashMap<>();
//////////////////////////////////////////////////////////////////////////////////////
// track which keys need processed - start by removing ones we know we handle above //
//////////////////////////////////////////////////////////////////////////////////////
Set<String> keys = new HashSet<>(params.keySet());
keys.remove("processName");
keys.remove("backendVariantData");
if(!keys.isEmpty())
{
//////////////////////////////////////////////////////////////////////////
// first make a pass going over the process's identified input fields - //
// getting values from the quartz job data map, and putting them into //
// the process input value map as the field's type (if we can) //
//////////////////////////////////////////////////////////////////////////
for(QFieldMetaData inputField : process.getInputFields())
{
String fieldName = inputField.getName();
if(params.containsKey(fieldName))
{
Object value = params.get(fieldName);
try
{
processInputValues.put(fieldName, ValueUtils.getValueAsFieldType(inputField.getType(), value));
keys.remove(fieldName);
}
catch(Exception e)
{
LOG.warn("Error getting process input value from quartz job data map", e, logPair("fieldName", fieldName), logPair("value", value));
}
}
}
////////////////////////////////////////////////////////////////////////////////////////
// if any values are left in the map (based on keys set that we're removing from) //
// then try to put those in the input map (assuming they can be cast to Serializable) //
////////////////////////////////////////////////////////////////////////////////////////
for(String key : keys)
{
Object value = params.get(key);
try
{
processInputValues.put(key, (Serializable) value);
}
catch(Exception e)
{
LOG.warn("Error getting process input value from quartz job data map", e, logPair("key", key), logPair("value", value));
}
}
}
return processInputValues;
}
}

View File

@ -0,0 +1,51 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.schedulable.runner;
import java.util.Map;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentity;
/*******************************************************************************
** Interface for different types of schedulabe things that can be run
*******************************************************************************/
public interface SchedulableRunner
{
/*******************************************************************************
**
*******************************************************************************/
void run(Map<String, Object> params);
/*******************************************************************************
**
*******************************************************************************/
void validateParams(SchedulableIdentity schedulableIdentity, Map<String, Object> paramMap) throws QException;
/*******************************************************************************
**
*******************************************************************************/
String getDescription(Map<String, Object> params);
}

View File

@ -0,0 +1,135 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.schedulable.runner;
import java.util.Map;
import com.kingsrook.qqq.backend.core.actions.queues.SQSQueuePoller;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.scheduler.quartz.QuartzScheduler;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentity;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
** Schedulable SQSQueue runner - e.g., how an SQSQueuePoller is run by a scheduler.
*******************************************************************************/
public class SchedulableSQSQueueRunner implements SchedulableRunner
{
private static final QLogger LOG = QLogger.getLogger(SchedulableSQSQueueRunner.class);
/*******************************************************************************
**
*******************************************************************************/
@Override
public void run(Map<String, Object> params)
{
QInstance qInstance = QuartzScheduler.getInstance().getQInstance();
String queueName = ValueUtils.getValueAsString(params.get("queueName"));
if(!StringUtils.hasContent(queueName))
{
LOG.warn("Missing queueName in params.");
return;
}
QQueueMetaData queue = qInstance.getQueue(queueName);
if(queue == null)
{
LOG.warn("Unrecognized queueName [" + queueName + "]");
return;
}
QQueueProviderMetaData queueProvider = qInstance.getQueueProvider(queue.getProviderName());
if(!(queueProvider instanceof SQSQueueProviderMetaData))
{
LOG.warn("Queue [" + queueName + "] is of an unsupported queue provider type (not SQS)");
return;
}
SQSQueuePoller sqsQueuePoller = new SQSQueuePoller();
sqsQueuePoller.setQueueMetaData(queue);
sqsQueuePoller.setQueueProviderMetaData((SQSQueueProviderMetaData) queueProvider);
sqsQueuePoller.setQInstance(qInstance);
sqsQueuePoller.setSessionSupplier(QuartzScheduler.getInstance().getSessionSupplier());
/////////////
// run it. //
/////////////
LOG.debug("Running SQS Queue poller", logPair("queueName", queueName));
sqsQueuePoller.run();
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void validateParams(SchedulableIdentity schedulableIdentity, Map<String, Object> paramMap) throws QException
{
String queueName = ValueUtils.getValueAsString(paramMap.get("queueName"));
if(!StringUtils.hasContent(queueName))
{
throw (new QException("Missing scheduledJobParameter with key [queueName] in " + schedulableIdentity));
}
QQueueMetaData queue = QContext.getQInstance().getQueue(queueName);
if(queue == null)
{
throw (new QException("Unrecognized queueName [" + queueName + "] in " + schedulableIdentity));
}
QQueueProviderMetaData queueProvider = QContext.getQInstance().getQueueProvider(queue.getProviderName());
if(!(queueProvider instanceof SQSQueueProviderMetaData))
{
throw (new QException("Queue [" + queueName + "] is of an unsupported queue provider type (not SQS) in " + schedulableIdentity));
}
if(queue.getSchedule() != null)
{
throw (new QException("Queue [" + queueName + "] has a schedule in its metaData - so it should not be dynamically scheduled via a scheduled job! " + schedulableIdentity));
}
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public String getDescription(Map<String, Object> params)
{
return "Queue: " + params.get("queueName");
}
}

View File

@ -0,0 +1,168 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.schedulable.runner;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.automation.QAutomationProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
import com.kingsrook.qqq.backend.core.scheduler.quartz.QuartzScheduler;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentity;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
** Schedulable TableAutomations runner - e.g., how a table automations are run
** by a scheduler.
*******************************************************************************/
public class SchedulableTableAutomationsRunner implements SchedulableRunner
{
private static final QLogger LOG = QLogger.getLogger(SchedulableTableAutomationsRunner.class);
/*******************************************************************************
**
*******************************************************************************/
@Override
public void run(Map<String, Object> params)
{
QInstance qInstance = QuartzScheduler.getInstance().getQInstance();
String tableName = ValueUtils.getValueAsString(params.get("tableName"));
if(!StringUtils.hasContent(tableName))
{
LOG.warn("Missing tableName in params.");
return;
}
QTableMetaData table = qInstance.getTable(tableName);
if(table == null)
{
LOG.warn("Unrecognized tableName [" + tableName + "]");
return;
}
AutomationStatus automationStatus = AutomationStatus.valueOf(ValueUtils.getValueAsString(params.get("automationStatus")));
/////////////
// run it. //
/////////////
LOG.debug("Running table automations", logPair("tableName", tableName), logPair(""));
QTableAutomationDetails automationDetails = table.getAutomationDetails();
if(automationDetails == null)
{
LOG.warn("Could not find automationDetails for table for automations in QInstance", logPair("tableName", tableName));
return;
}
///////////////////////////////////
// todo - sharded automations... //
///////////////////////////////////
PollingAutomationPerTableRunner.TableActionsInterface tableAction = new PollingAutomationPerTableRunner.TableActions(tableName, automationDetails, automationStatus);
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationDetails.getProviderName(), QuartzScheduler.getInstance().getSessionSupplier(), tableAction);
/////////////
// run it. //
/////////////
LOG.debug("Running Table Automations", logPair("tableName", tableName), logPair("automationStatus", automationStatus));
runner.run();
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void validateParams(SchedulableIdentity schedulableIdentity, Map<String, Object> paramMap) throws QException
{
String tableName = ValueUtils.getValueAsString(paramMap.get("tableName"));
if(!StringUtils.hasContent(tableName))
{
throw (new QException("Missing scheduledJobParameter with key [tableName] in " + schedulableIdentity));
}
String automationStatus = ValueUtils.getValueAsString(paramMap.get("automationStatus"));
if(!StringUtils.hasContent(automationStatus))
{
throw (new QException("Missing scheduledJobParameter with key [automationStatus] in " + schedulableIdentity));
}
QTableMetaData table = QContext.getQInstance().getTable(tableName);
if(table == null)
{
throw (new QException("Unrecognized tableName [" + tableName + "] in " + schedulableIdentity));
}
QTableAutomationDetails automationDetails = table.getAutomationDetails();
if(automationDetails == null)
{
throw (new QException("Table [" + tableName + "] does not have automationDetails in " + schedulableIdentity));
}
if(automationDetails.getSchedule() != null)
{
throw (new QException("Table [" + tableName + "] automationDetails has a schedule in its metaData - so it should not be dynamically scheduled via a scheduled job! " + schedulableIdentity));
}
QAutomationProviderMetaData automationProvider = QContext.getQInstance().getAutomationProvider(automationDetails.getProviderName());
List<PollingAutomationPerTableRunner.TableActionsInterface> tableActionList = PollingAutomationPerTableRunner.getTableActions(QContext.getQInstance(), automationProvider.getName());
for(PollingAutomationPerTableRunner.TableActionsInterface tableActions : tableActionList)
{
if(tableActions.status().name().equals(automationStatus))
{
return;
}
}
/////////////////////////////////////////////////////////////////////////////////////
// if we get out of the loop, it means we didn't find a matching status - so throw //
/////////////////////////////////////////////////////////////////////////////////////
throw (new QException("Did not find table automation actions matching automationStatus [" + automationStatus + "] for table [" + tableName + "] in " + schedulableIdentity
+ " (Found: " + tableActionList.stream().map(ta -> ta.status().name()).collect(Collectors.joining(",")) + ")"));
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public String getDescription(Map<String, Object> params)
{
return "TableAutomations: " + params.get("tableName") + "." + params.get("automationStatus");
}
}

View File

@ -0,0 +1,87 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.simple;
import java.util.Map;
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
import com.kingsrook.qqq.backend.core.context.CapturedContext;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableRunner;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
**
*******************************************************************************/
public class SimpleJobRunner implements Runnable
{
private static final QLogger LOG = QLogger.getLogger(SimpleJobRunner.class);
private QInstance qInstance;
private SchedulableType schedulableType;
private Map<String, Object> params;
/*******************************************************************************
** Constructor
**
*******************************************************************************/
public SimpleJobRunner(QInstance qInstance, SchedulableType type, Map<String, Object> params)
{
this.qInstance = qInstance;
this.schedulableType = type;
this.params = params;
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void run()
{
CapturedContext capturedContext = QContext.capture();
try
{
SimpleScheduler simpleScheduler = SimpleScheduler.getInstance(qInstance);
QContext.init(qInstance, simpleScheduler.getSessionSupplier().get());
SchedulableRunner schedulableRunner = QCodeLoader.getAdHoc(SchedulableRunner.class, schedulableType.getRunner());
schedulableRunner.run(params);
}
catch(Exception e)
{
LOG.warn("Error running SimpleScheduler job", e, logPair("params", params));
}
finally
{
QContext.init(capturedContext);
}
}
}

View File

@ -24,22 +24,21 @@ package com.kingsrook.qqq.backend.core.scheduler.simple;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner;
import com.kingsrook.qqq.backend.core.actions.queues.SQSQueuePoller;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.automation.QAutomationProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.scheduler.QSchedulerInterface;
import com.kingsrook.qqq.backend.core.scheduler.SchedulerUtils;
import org.apache.commons.lang.NotImplementedException;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.SchedulableIdentity;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
@ -67,7 +66,7 @@ public class SimpleScheduler implements QSchedulerInterface
/////////////////////////////////////////////////////////////////////////////////////
private int delayIndex = 0;
private List<StandardScheduledExecutor> executors = new ArrayList<>();
private Map<SchedulableIdentity, StandardScheduledExecutor> executors = new LinkedHashMap<>();
@ -101,7 +100,7 @@ public class SimpleScheduler implements QSchedulerInterface
@Override
public void start()
{
for(StandardScheduledExecutor executor : executors)
for(StandardScheduledExecutor executor : executors.values())
{
executor.start();
}
@ -115,7 +114,7 @@ public class SimpleScheduler implements QSchedulerInterface
@Override
public void stopAsync()
{
for(StandardScheduledExecutor scheduledExecutor : executors)
for(StandardScheduledExecutor scheduledExecutor : executors.values())
{
scheduledExecutor.stopAsync();
}
@ -129,7 +128,7 @@ public class SimpleScheduler implements QSchedulerInterface
@Override
public void stop()
{
for(StandardScheduledExecutor scheduledExecutor : executors)
for(StandardScheduledExecutor scheduledExecutor : executors.values())
{
scheduledExecutor.stop();
}
@ -141,83 +140,18 @@ public class SimpleScheduler implements QSchedulerInterface
**
*******************************************************************************/
@Override
public void setupTableAutomation(QAutomationProviderMetaData automationProvider, PollingAutomationPerTableRunner.TableActionsInterface tableActions, QScheduleMetaData schedule, boolean allowedToStart)
public void setupSchedulable(SchedulableIdentity schedulableIdentity, SchedulableType schedulableType, Map<String, Serializable> parameters, QScheduleMetaData schedule, boolean allowedToStart)
{
if(!allowedToStart)
{
return;
}
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProvider.getName(), sessionSupplier, tableActions);
StandardScheduledExecutor executor = new StandardScheduledExecutor(runner);
executor.setName(runner.getName());
SimpleJobRunner simpleJobRunner = new SimpleJobRunner(qInstance, schedulableType, new HashMap<>(parameters));
StandardScheduledExecutor executor = new StandardScheduledExecutor(simpleJobRunner);
executor.setName(schedulableIdentity.getIdentity());
setScheduleInExecutor(schedule, executor);
executors.add(executor);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void unscheduleProcess(QProcessMetaData process)
{
throw (new NotImplementedException("Unscheduling is not implemented in SimpleScheduler..."));
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void setupSqsPoller(SQSQueueProviderMetaData queueProvider, QQueueMetaData queue, QScheduleMetaData schedule, boolean allowedToStart)
{
if(!allowedToStart)
{
return;
}
QInstance scheduleManagerQueueInstance = qInstance;
Supplier<QSession> scheduleManagerSessionSupplier = sessionSupplier;
SQSQueuePoller sqsQueuePoller = new SQSQueuePoller();
sqsQueuePoller.setQueueProviderMetaData(queueProvider);
sqsQueuePoller.setQueueMetaData(queue);
sqsQueuePoller.setQInstance(scheduleManagerQueueInstance);
sqsQueuePoller.setSessionSupplier(scheduleManagerSessionSupplier);
StandardScheduledExecutor executor = new StandardScheduledExecutor(sqsQueuePoller);
executor.setName(queue.getName());
setScheduleInExecutor(schedule, executor);
executors.add(executor);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void setupProcess(QProcessMetaData process, Map<String, Serializable> backendVariantData, QScheduleMetaData schedule, boolean allowedToStart)
{
if(!allowedToStart)
{
return;
}
Runnable runProcess = () ->
{
SchedulerUtils.runProcess(qInstance, sessionSupplier, process, backendVariantData);
};
StandardScheduledExecutor executor = new StandardScheduledExecutor(runProcess);
executor.setName("process:" + process.getName());
setScheduleInExecutor(schedule, executor);
executors.add(executor);
executors.put(schedulableIdentity, executor);
}
@ -255,13 +189,34 @@ public class SimpleScheduler implements QSchedulerInterface
/*******************************************************************************
**
*******************************************************************************/
private QScheduleMetaData getDefaultSchedule()
@Override
public void unscheduleSchedulable(SchedulableIdentity schedulableIdentity, SchedulableType schedulableType)
{
QScheduleMetaData schedule;
schedule = new QScheduleMetaData()
.withInitialDelaySeconds(delayIndex++)
.withRepeatSeconds(60);
return schedule;
StandardScheduledExecutor executor = executors.get(schedulableIdentity);
if(executor != null)
{
LOG.info("Stopping job in simple scheduler", logPair("identity", schedulableIdentity));
executors.remove(schedulableIdentity);
executor.stop();
}
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public void unscheduleAll() throws QException
{
for(Map.Entry<SchedulableIdentity, StandardScheduledExecutor> entry : new HashSet<>(executors.entrySet()))
{
StandardScheduledExecutor executor = executors.remove(entry.getKey());
if(executor != null)
{
executor.stopAsync();
}
}
}
@ -278,21 +233,23 @@ public class SimpleScheduler implements QSchedulerInterface
/*******************************************************************************
** Getter for managedExecutors
** Getter for sessionSupplier
**
*******************************************************************************/
public List<StandardScheduledExecutor> getExecutors()
public Supplier<QSession> getSessionSupplier()
{
return executors;
return sessionSupplier;
}
/*******************************************************************************
** Getter for managedExecutors
**
*******************************************************************************/
static void resetSingleton()
public List<StandardScheduledExecutor> getExecutors()
{
return new ArrayList<>(executors.values());
}

View File

@ -0,0 +1,205 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler;
import java.util.ArrayList;
import java.util.Map;
import com.kingsrook.qqq.backend.core.BaseTest;
import com.kingsrook.qqq.backend.core.actions.automation.AutomationStatus;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QCollectingLogger;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJob;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJobParameter;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJobType;
import com.kingsrook.qqq.backend.core.scheduler.quartz.QuartzScheduler;
import com.kingsrook.qqq.backend.core.scheduler.quartz.QuartzTestUtils;
import com.kingsrook.qqq.backend.core.utils.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/*******************************************************************************
** Unit test for QScheduleManager
*******************************************************************************/
class QScheduleManagerTest extends BaseTest
{
/*******************************************************************************
**
*******************************************************************************/
@AfterEach
void afterEach()
{
QLogger.deactivateCollectingLoggerForClass(QuartzScheduler.class);
try
{
QScheduleManager.getInstance().unInit();
}
catch(IllegalStateException ise)
{
/////////////////////////////////////////////////////////////////
// ok, might just mean that this test didn't init the instance //
/////////////////////////////////////////////////////////////////
}
try
{
QuartzScheduler.getInstance().unInit();
}
catch(IllegalStateException ise)
{
/////////////////////////////////////////////////////////////////
// ok, might just mean that this test didn't init the instance //
/////////////////////////////////////////////////////////////////
}
}
/*******************************************************************************
**
*******************************************************************************/
private ScheduledJob newScheduledJob(ScheduledJobType type, Map<String, String> params)
{
ScheduledJob scheduledJob = new ScheduledJob()
.withId(1)
.withIsActive(true)
.withSchedulerName(TestUtils.SIMPLE_SCHEDULER_NAME)
.withType(type.getId())
.withRepeatSeconds(1)
.withJobParameters(new ArrayList<>());
for(Map.Entry<String, String> entry : params.entrySet())
{
scheduledJob.getJobParameters().add(new ScheduledJobParameter().withKey(entry.getKey()).withValue(entry.getValue()));
}
return (scheduledJob);
}
/*******************************************************************************
**
*******************************************************************************/
@Test
void testSetupScheduledJobErrorCases() throws QException
{
QScheduleManager qScheduleManager = QScheduleManager.initInstance(QContext.getQInstance(), () -> QContext.getQSession());
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.PROCESS, Map.of()).withRepeatSeconds(null)))
.hasMessageContaining("Missing a schedule");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.PROCESS, Map.of()).withType(null)))
.hasMessageContaining("Missing a type");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.PROCESS, Map.of()).withType("notAType")))
.hasMessageContaining("Unrecognized type");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.PROCESS, Map.of())))
.hasMessageContaining("Missing scheduledJobParameter with key [processName]");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.PROCESS, Map.of("processName", "notAProcess"))))
.hasMessageContaining("Unrecognized processName");
QContext.getQInstance().getProcess(TestUtils.PROCESS_NAME_BASEPULL).withSchedule(new QScheduleMetaData());
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.PROCESS, Map.of("processName", TestUtils.PROCESS_NAME_BASEPULL))))
.hasMessageContaining("has a schedule in its metaData - so it should not be dynamically scheduled");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.QUEUE_PROCESSOR, Map.of())))
.hasMessageContaining("Missing scheduledJobParameter with key [queueName]");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.QUEUE_PROCESSOR, Map.of("queueName", "notAQueue"))))
.hasMessageContaining("Unrecognized queueName");
QContext.getQInstance().getQueue(TestUtils.TEST_SQS_QUEUE).withSchedule(new QScheduleMetaData());
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.QUEUE_PROCESSOR, Map.of("queueName", TestUtils.TEST_SQS_QUEUE))))
.hasMessageContaining("has a schedule in its metaData - so it should not be dynamically scheduled");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.TABLE_AUTOMATIONS, Map.of())))
.hasMessageContaining("Missing scheduledJobParameter with key [tableName]");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.TABLE_AUTOMATIONS, Map.of("tableName", "notATable"))))
.hasMessageContaining("Missing scheduledJobParameter with key [automationStatus]");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.TABLE_AUTOMATIONS, Map.of("tableName", "notATable", "automationStatus", AutomationStatus.PENDING_INSERT_AUTOMATIONS.name()))))
.hasMessageContaining("Unrecognized tableName");
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.TABLE_AUTOMATIONS, Map.of("tableName", TestUtils.TABLE_NAME_LINE_ITEM_EXTRINSIC, "automationStatus", AutomationStatus.PENDING_INSERT_AUTOMATIONS.name()))))
.hasMessageContaining("does not have automationDetails");
QContext.getQInstance().getTable(TestUtils.TABLE_NAME_PERSON_MEMORY).getAutomationDetails().withSchedule(null);
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.TABLE_AUTOMATIONS, Map.of("tableName", TestUtils.TABLE_NAME_PERSON_MEMORY, "automationStatus", "foobar"))))
.hasMessageContaining("Did not find table automation actions matching automationStatus")
.hasMessageContaining("Found: PENDING_INSERT_AUTOMATIONS,PENDING_UPDATE_AUTOMATIONS");
QContext.getQInstance().getTable(TestUtils.TABLE_NAME_PERSON_MEMORY).getAutomationDetails().withSchedule(new QScheduleMetaData());
assertThatThrownBy(() -> qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.TABLE_AUTOMATIONS, Map.of("tableName", TestUtils.TABLE_NAME_PERSON_MEMORY, "automationStatus", AutomationStatus.PENDING_INSERT_AUTOMATIONS.name()))))
.hasMessageContaining("has a schedule in its metaData - so it should not be dynamically scheduled");
}
/*******************************************************************************
**
*******************************************************************************/
@Test
void testSuccessfulScheduleWithQuartz() throws QException
{
QCollectingLogger quartzLogger = QLogger.activateCollectingLoggerForClass(QuartzScheduler.class);
QInstance qInstance = QContext.getQInstance();
QuartzTestUtils.setupInstanceForQuartzTests();
QScheduleManager qScheduleManager = QScheduleManager.initInstance(qInstance, () -> QContext.getQSession());
qScheduleManager.start();
qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.PROCESS,
Map.of("processName", TestUtils.PROCESS_NAME_GREET_PEOPLE))
.withId(2)
.withSchedulerName(QuartzTestUtils.QUARTZ_SCHEDULER_NAME));
qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.QUEUE_PROCESSOR,
Map.of("queueName", TestUtils.TEST_SQS_QUEUE))
.withId(3)
.withSchedulerName(QuartzTestUtils.QUARTZ_SCHEDULER_NAME));
qScheduleManager.setupScheduledJob(newScheduledJob(ScheduledJobType.TABLE_AUTOMATIONS,
Map.of("tableName", TestUtils.TABLE_NAME_PERSON_MEMORY, "automationStatus", AutomationStatus.PENDING_UPDATE_AUTOMATIONS.name()))
.withId(4)
.withSchedulerName(QuartzTestUtils.QUARTZ_SCHEDULER_NAME));
assertThat(quartzLogger.getCollectedMessages())
.anyMatch(l -> l.getMessage().matches(".*Scheduled new job.*PROCESS.scheduledJob:2.*"))
.anyMatch(l -> l.getMessage().matches(".*Scheduled new job.*QUEUE_PROCESSOR.scheduledJob:3.*"))
.anyMatch(l -> l.getMessage().matches(".*Scheduled new job.*TABLE_AUTOMATIONS.scheduledJob:4.*"));
}
}

View File

@ -0,0 +1,79 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
/*******************************************************************************
**
*******************************************************************************/
public class SchedulerTestUtils
{
/*******************************************************************************
**
*******************************************************************************/
public static QProcessMetaData buildTestProcess(String name, String schedulerName)
{
return new QProcessMetaData()
.withName(name)
.withSchedule(new QScheduleMetaData()
.withSchedulerName(schedulerName)
.withRepeatMillis(2)
.withInitialDelaySeconds(0))
.withStepList(List.of(new QBackendStepMetaData()
.withName("step")
.withCode(new QCodeReference(BasicStep.class))));
}
/*******************************************************************************
**
*******************************************************************************/
public static class BasicStep implements BackendStep
{
public static int counter = 0;
/*******************************************************************************
**
*******************************************************************************/
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
counter++;
}
}
}

View File

@ -22,23 +22,24 @@
package com.kingsrook.qqq.backend.core.scheduler.quartz;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.kingsrook.qqq.backend.core.BaseTest;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QCollectingLogger;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJobType;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.scheduler.QScheduleManager;
import com.kingsrook.qqq.backend.core.scheduler.SchedulerTestUtils;
import com.kingsrook.qqq.backend.core.scheduler.SchedulerTestUtils.BasicStep;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.SchedulableType;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.identity.BasicSchedulableIdentity;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableSQSQueueRunner;
import com.kingsrook.qqq.backend.core.scheduler.schedulable.runner.SchedulableTableAutomationsRunner;
import com.kingsrook.qqq.backend.core.utils.SleepUtils;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.AfterEach;
@ -100,20 +101,21 @@ class QuartzSchedulerTest extends BaseTest
//////////////////////////////////////////////////////////////////////////////////////////////////////
// set these runners to use collecting logger, so we can assert that they did run, and didn't throw //
//////////////////////////////////////////////////////////////////////////////////////////////////////
QCollectingLogger quartzSqsPollerJobLog = QLogger.activateCollectingLoggerForClass(QuartzSqsPollerJob.class);
QCollectingLogger quartzTableAutomationsJobLog = QLogger.activateCollectingLoggerForClass(QuartzTableAutomationsJob.class);
QCollectingLogger quartzSqsPollerJobLog = QLogger.activateCollectingLoggerForClass(SchedulableSQSQueueRunner.class);
QCollectingLogger quartzTableAutomationsJobLog = QLogger.activateCollectingLoggerForClass(SchedulableTableAutomationsRunner.class);
//////////////////////////////////////////
// add a process we can run and observe //
//////////////////////////////////////////
qInstance.addProcess(buildTestProcess("testScheduledProcess"));
qInstance.addProcess(SchedulerTestUtils.buildTestProcess("testScheduledProcess", QuartzTestUtils.QUARTZ_SCHEDULER_NAME));
//////////////////////////////////////////////////////////////////////////////
// start the schedule manager, which will schedule things, and start quartz //
//////////////////////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////////////
// start the schedule manager, then ask it to set up all schedules //
/////////////////////////////////////////////////////////////////////
QSession qSession = QContext.getQSession();
QScheduleManager qScheduleManager = QScheduleManager.initInstance(qInstance, () -> qSession);
qScheduleManager.start();
qScheduleManager.setupAllSchedules();
//////////////////////////////////////////////////
// give a moment for the job to run a few times //
@ -128,7 +130,7 @@ class QuartzSchedulerTest extends BaseTest
// make sure poller ran, and didn't issue any warns //
//////////////////////////////////////////////////////
assertThat(quartzSqsPollerJobLog.getCollectedMessages())
.anyMatch(m -> m.getLevel().equals(Level.DEBUG) && m.getMessage().contains("Running quartz SQS Poller"))
.anyMatch(m -> m.getLevel().equals(Level.DEBUG) && m.getMessage().contains("Running SQS Queue poller"))
.noneMatch(m -> m.getLevel().equals(Level.WARN));
//////////////////////////////////////////////////////
@ -140,30 +142,13 @@ class QuartzSchedulerTest extends BaseTest
}
finally
{
QLogger.deactivateCollectingLoggerForClass(QuartzSqsPollerJob.class);
QLogger.deactivateCollectingLoggerForClass(SchedulableSQSQueueRunner.class);
QLogger.deactivateCollectingLoggerForClass(SchedulableTableAutomationsRunner.class);
}
}
/*******************************************************************************
**
*******************************************************************************/
private static QProcessMetaData buildTestProcess(String name)
{
return new QProcessMetaData()
.withName(name)
.withSchedule(new QScheduleMetaData()
.withSchedulerName(QuartzTestUtils.QUARTZ_SCHEDULER_NAME)
.withRepeatMillis(2)
.withInitialDelaySeconds(0))
.withStepList(List.of(new QBackendStepMetaData()
.withName("step")
.withCode(new QCodeReference(BasicStep.class))));
}
/*******************************************************************************
**
*******************************************************************************/
@ -171,45 +156,33 @@ class QuartzSchedulerTest extends BaseTest
void testRemovingNoLongerNeededJobsDuringSetupSchedules() throws SchedulerException
{
QInstance qInstance = QContext.getQInstance();
QScheduleManager.defineDefaultSchedulableTypesInInstance(qInstance);
QuartzTestUtils.setupInstanceForQuartzTests();
////////////////////////////
// put two jobs in quartz //
////////////////////////////
QProcessMetaData test1 = buildTestProcess("test1");
QProcessMetaData test2 = buildTestProcess("test2");
QProcessMetaData test1 = SchedulerTestUtils.buildTestProcess("test1", QuartzTestUtils.QUARTZ_SCHEDULER_NAME);
QProcessMetaData test2 = SchedulerTestUtils.buildTestProcess("test2", QuartzTestUtils.QUARTZ_SCHEDULER_NAME);
qInstance.addProcess(test1);
qInstance.addProcess(test2);
SchedulableType schedulableType = qInstance.getSchedulableType(ScheduledJobType.PROCESS.getId());
QuartzScheduler quartzScheduler = QuartzScheduler.initInstance(qInstance, QuartzTestUtils.QUARTZ_SCHEDULER_NAME, QuartzTestUtils.getQuartzProperties(), () -> QContext.getQSession());
quartzScheduler.setupProcess(test1, null, test1.getSchedule(), false);
quartzScheduler.setupProcess(test2, null, test2.getSchedule(), false);
quartzScheduler.start();
quartzScheduler.setupSchedulable(new BasicSchedulableIdentity("process:test1", null), schedulableType, Collections.emptyMap(), test1.getSchedule(), false);
quartzScheduler.setupSchedulable(new BasicSchedulableIdentity("process:test2", null), schedulableType, Collections.emptyMap(), test1.getSchedule(), false);
quartzScheduler.startOfSetupSchedules();
quartzScheduler.setupProcess(test1, null, test1.getSchedule(), false);
quartzScheduler.setupSchedulable(new BasicSchedulableIdentity("process:test1", null), schedulableType, Collections.emptyMap(), test1.getSchedule(), false);
quartzScheduler.endOfSetupSchedules();
List<QuartzJobAndTriggerWrapper> quartzJobAndTriggerWrappers = quartzScheduler.queryQuartz();
assertEquals(1, quartzJobAndTriggerWrappers.size());
assertEquals("test1", quartzJobAndTriggerWrappers.get(0).jobDetail().getKey().getName());
assertEquals("process:test1", quartzJobAndTriggerWrappers.get(0).jobDetail().getKey().getName());
}
/*******************************************************************************
**
*******************************************************************************/
public static class BasicStep implements BackendStep
{
static int counter = 0;
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
counter++;
}
}
}

View File

@ -79,6 +79,7 @@ class QuartzJobsProcessTest extends BaseTest
QSession qSession = QContext.getQSession();
QScheduleManager qScheduleManager = QScheduleManager.initInstance(qInstance, () -> qSession);
qScheduleManager.start();
qScheduleManager.setupAllSchedules();
}

View File

@ -22,27 +22,22 @@
package com.kingsrook.qqq.backend.core.scheduler.simple;
import java.util.List;
import java.util.concurrent.TimeUnit;
import com.kingsrook.qqq.backend.core.BaseTest;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.instances.QInstanceValidator;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.scheduler.QScheduleManager;
import com.kingsrook.qqq.backend.core.scheduler.SchedulerTestUtils;
import com.kingsrook.qqq.backend.core.scheduler.SchedulerTestUtils.BasicStep;
import com.kingsrook.qqq.backend.core.utils.SleepUtils;
import com.kingsrook.qqq.backend.core.utils.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -75,12 +70,10 @@ class SimpleSchedulerTest extends BaseTest
qScheduleManager.start();
SimpleScheduler simpleScheduler = SimpleScheduler.getInstance(qInstance);
simpleScheduler.setSchedulerName(TestUtils.SIMPLE_SCHEDULER_NAME);
simpleScheduler.start();
assertThat(simpleScheduler.getExecutors()).isNotEmpty();
qScheduleManager.stop();
simpleScheduler.getExecutors().forEach(e -> assertEquals(StandardScheduledExecutor.RunningState.STOPPED, e.getRunningState()));
}
@ -96,16 +89,7 @@ class SimpleSchedulerTest extends BaseTest
qInstance.getAutomationProviders().clear();
qInstance.getQueueProviders().clear();
qInstance.addProcess(
new QProcessMetaData()
.withName("testScheduledProcess")
.withSchedule(new QScheduleMetaData()
.withSchedulerName(TestUtils.SIMPLE_SCHEDULER_NAME)
.withRepeatMillis(2)
.withInitialDelaySeconds(0))
.withStepList(List.of(new QBackendStepMetaData()
.withName("step")
.withCode(new QCodeReference(BasicStep.class)))));
qInstance.addProcess(SchedulerTestUtils.buildTestProcess("testScheduledProcess", TestUtils.SIMPLE_SCHEDULER_NAME));
BasicStep.counter = 0;
@ -124,21 +108,4 @@ class SimpleSchedulerTest extends BaseTest
}
/*******************************************************************************
**
*******************************************************************************/
public static class BasicStep implements BackendStep
{
static int counter = 0;
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
counter++;
}
}
}