From 3c2a34291a7ce492f4ee573456f66b1397370a6a Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Wed, 27 Dec 2023 08:47:03 -0600 Subject: [PATCH] Refactor, moving methods into SchedulerUtils, for use by other schedulers --- .../core/scheduler/ScheduleManager.java | 130 +------------ .../core/scheduler/SchedulerUtils.java | 176 ++++++++++++++++++ 2 files changed, 182 insertions(+), 124 deletions(-) create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/SchedulerUtils.java diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/ScheduleManager.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/ScheduleManager.java index 375aebb9..cb7e0388 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/ScheduleManager.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/ScheduleManager.java @@ -29,20 +29,11 @@ import java.util.Map; import java.util.Objects; import java.util.function.Supplier; import com.kingsrook.qqq.backend.core.actions.automation.polling.PollingAutomationPerTableRunner; -import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction; import com.kingsrook.qqq.backend.core.actions.queues.SQSQueuePoller; -import com.kingsrook.qqq.backend.core.actions.tables.QueryAction; import com.kingsrook.qqq.backend.core.context.QContext; -import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.instances.QMetaDataVariableInterpreter; import com.kingsrook.qqq.backend.core.logging.LogPair; import com.kingsrook.qqq.backend.core.logging.QLogger; -import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; -import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator; -import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria; -import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter; -import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput; -import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput; import com.kingsrook.qqq.backend.core.model.data.QRecord; import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData; import com.kingsrook.qqq.backend.core.model.metadata.QInstance; @@ -141,7 +132,7 @@ public class ScheduleManager for(QProcessMetaData process : qInstance.getProcesses().values()) { - if(process.getSchedule() != null && allowedToStart(process.getName())) + if(process.getSchedule() != null && SchedulerUtils.allowedToStart(process.getName())) { QScheduleMetaData scheduleMetaData = process.getSchedule(); if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy())) @@ -158,7 +149,7 @@ public class ScheduleManager // running at the same time, get the variant records and schedule each separately // ///////////////////////////////////////////////////////////////////////////////////////////////////// QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend()); - for(QRecord qRecord : CollectionUtils.nonNullList(getBackendVariantFilteredRecords(process))) + for(QRecord qRecord : CollectionUtils.nonNullList(SchedulerUtils.getBackendVariantFilteredRecords(process))) { try { @@ -188,34 +179,6 @@ public class ScheduleManager - /******************************************************************************* - ** - *******************************************************************************/ - private List getBackendVariantFilteredRecords(QProcessMetaData processMetaData) - { - List records = null; - try - { - QScheduleMetaData scheduleMetaData = processMetaData.getSchedule(); - QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend()); - - QueryInput queryInput = new QueryInput(); - queryInput.setTableName(backendMetaData.getVariantOptionsTableName()); - queryInput.setFilter(new QQueryFilter(new QFilterCriteria(backendMetaData.getVariantOptionsTableTypeField(), QCriteriaOperator.EQUALS, backendMetaData.getVariantOptionsTableTypeValue()))); - - QContext.init(qInstance, sessionSupplier.get()); - QueryOutput queryOutput = new QueryAction().execute(queryInput); - records = queryOutput.getRecords(); - } - catch(Exception e) - { - LOG.error("An error fetching variant data for process [" + processMetaData.getLabel() + "]", e); - } - - return (records); - } - - /******************************************************************************* ** @@ -229,7 +192,7 @@ public class ScheduleManager List tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName()); for(PollingAutomationPerTableRunner.TableActionsInterface tableAction : tableActions) { - if(allowedToStart(tableAction.tableName())) + if(SchedulerUtils.allowedToStart(tableAction.tableName())) { PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProvider.getName(), sessionSupplier, tableAction); StandardScheduledExecutor executor = new StandardScheduledExecutor(runner); @@ -250,29 +213,12 @@ public class ScheduleManager - /******************************************************************************* - ** - *******************************************************************************/ - private boolean allowedToStart(String name) - { - String propertyName = "qqq.scheduleManager.onlyStartNamesMatching"; - String propertyValue = System.getProperty(propertyName, ""); - if(propertyValue.equals("")) - { - return (true); - } - - return (name.matches(propertyValue)); - } - - - /******************************************************************************* ** *******************************************************************************/ private void startQueueProvider(QQueueProviderMetaData queueProvider) { - if(allowedToStart(queueProvider.getName())) + if(SchedulerUtils.allowedToStart(queueProvider.getName())) { switch(queueProvider.getType()) { @@ -297,7 +243,7 @@ public class ScheduleManager for(QQueueMetaData queue : qInstance.getQueues().values()) { - if(queueProvider.getName().equals(queue.getProviderName()) && allowedToStart(queue.getName())) + if(queueProvider.getName().equals(queue.getProviderName()) && SchedulerUtils.allowedToStart(queue.getName())) { SQSQueuePoller sqsQueuePoller = new SQSQueuePoller(); sqsQueuePoller.setQueueProviderMetaData(queueProvider); @@ -332,46 +278,7 @@ public class ScheduleManager { Runnable runProcess = () -> { - String originalThreadName = Thread.currentThread().getName(); - - try - { - if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.PARALLEL.equals(process.getSchedule().getVariantRunStrategy())) - { - QContext.init(qInstance, sessionSupplier.get()); - executeSingleProcess(process, backendVariantData); - } - else if(QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy())) - { - /////////////////////////////////////////////////////////////////////////////////////////////////// - // if this is "serial", which for example means we want to run each backend variant one after // - // the other in the same thread so loop over these here so that they run in same lambda function // - /////////////////////////////////////////////////////////////////////////////////////////////////// - for(QRecord qRecord : getBackendVariantFilteredRecords(process)) - { - try - { - QContext.init(qInstance, sessionSupplier.get()); - QScheduleMetaData scheduleMetaData = process.getSchedule(); - QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend()); - executeSingleProcess(process, MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField()))); - } - catch(Exception e) - { - LOG.error("An error starting process [" + process.getLabel() + "], with backend variant data.", e, new LogPair("variantQRecord", qRecord)); - } - } - } - } - catch(Exception e) - { - LOG.warn("Exception thrown running scheduled process [" + process.getName() + "]", e); - } - finally - { - Thread.currentThread().setName(originalThreadName); - QContext.clear(); - } + SchedulerUtils.runProcess(qInstance, sessionSupplier, process, backendVariantData); }; StandardScheduledExecutor executor = new StandardScheduledExecutor(runProcess); @@ -387,31 +294,6 @@ public class ScheduleManager - /******************************************************************************* - ** - *******************************************************************************/ - private static void executeSingleProcess(QProcessMetaData process, Map backendVariantData) throws QException - { - if(backendVariantData != null) - { - QContext.getQSession().setBackendVariants(backendVariantData); - } - - Thread.currentThread().setName("ScheduledProcess>" + process.getName()); - LOG.debug("Running Scheduled Process [" + process.getName() + "]"); - - RunProcessInput runProcessInput = new RunProcessInput(); - runProcessInput.setProcessName(process.getName()); - runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); - - QContext.pushAction(runProcessInput); - - RunProcessAction runProcessAction = new RunProcessAction(); - runProcessAction.execute(runProcessInput); - } - - - /******************************************************************************* ** *******************************************************************************/ diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/SchedulerUtils.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/SchedulerUtils.java new file mode 100644 index 00000000..76f86394 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/scheduler/SchedulerUtils.java @@ -0,0 +1,176 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2023. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.scheduler; + + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; +import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction; +import com.kingsrook.qqq.backend.core.actions.tables.QueryAction; +import com.kingsrook.qqq.backend.core.context.QContext; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.logging.LogPair; +import com.kingsrook.qqq.backend.core.logging.QLogger; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput; +import com.kingsrook.qqq.backend.core.model.data.QRecord; +import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.QInstance; +import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData; +import com.kingsrook.qqq.backend.core.model.session.QSession; +import com.kingsrook.qqq.backend.core.utils.collections.MapBuilder; + + +/******************************************************************************* + ** + *******************************************************************************/ +public class SchedulerUtils +{ + private static final QLogger LOG = QLogger.getLogger(SchedulerUtils.class); + + + + /******************************************************************************* + ** + *******************************************************************************/ + public static boolean allowedToStart(String name) + { + String propertyName = "qqq.scheduleManager.onlyStartNamesMatching"; + String propertyValue = System.getProperty(propertyName, ""); + if(propertyValue.equals("")) + { + return (true); + } + + return (name.matches(propertyValue)); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public static void runProcess(QInstance qInstance, Supplier sessionSupplier, QProcessMetaData process, Map backendVariantData) + { + String originalThreadName = Thread.currentThread().getName(); + + try + { + QContext.init(qInstance, sessionSupplier.get()); + + if(process.getSchedule().getVariantBackend() == null || QScheduleMetaData.RunStrategy.PARALLEL.equals(process.getSchedule().getVariantRunStrategy())) + { + SchedulerUtils.executeSingleProcess(process, backendVariantData); + } + else if(QScheduleMetaData.RunStrategy.SERIAL.equals(process.getSchedule().getVariantRunStrategy())) + { + /////////////////////////////////////////////////////////////////////////////////////////////////// + // if this is "serial", which for example means we want to run each backend variant one after // + // the other in the same thread so loop over these here so that they run in same lambda function // + /////////////////////////////////////////////////////////////////////////////////////////////////// + for(QRecord qRecord : getBackendVariantFilteredRecords(process)) + { + try + { + QScheduleMetaData scheduleMetaData = process.getSchedule(); + QBackendMetaData backendMetaData = qInstance.getBackend(scheduleMetaData.getVariantBackend()); + executeSingleProcess(process, MapBuilder.of(backendMetaData.getVariantOptionsTableTypeValue(), qRecord.getValue(backendMetaData.getVariantOptionsTableIdField()))); + } + catch(Exception e) + { + LOG.error("An error starting process [" + process.getLabel() + "], with backend variant data.", e, new LogPair("variantQRecord", qRecord)); + } + } + } + } + catch(Exception e) + { + LOG.warn("Exception thrown running scheduled process [" + process.getName() + "]", e); + } + finally + { + Thread.currentThread().setName(originalThreadName); + QContext.clear(); + } + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private static void executeSingleProcess(QProcessMetaData process, Map backendVariantData) throws QException + { + if(backendVariantData != null) + { + QContext.getQSession().setBackendVariants(backendVariantData); + } + + Thread.currentThread().setName("ScheduledProcess>" + process.getName()); + LOG.debug("Running Scheduled Process [" + process.getName() + "]"); + + RunProcessInput runProcessInput = new RunProcessInput(); + runProcessInput.setProcessName(process.getName()); + runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); + + QContext.pushAction(runProcessInput); + + RunProcessAction runProcessAction = new RunProcessAction(); + runProcessAction.execute(runProcessInput); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public static List getBackendVariantFilteredRecords(QProcessMetaData processMetaData) + { + List records = null; + try + { + QScheduleMetaData scheduleMetaData = processMetaData.getSchedule(); + QBackendMetaData backendMetaData = QContext.getQInstance().getBackend(scheduleMetaData.getVariantBackend()); + + QueryInput queryInput = new QueryInput(); + queryInput.setTableName(backendMetaData.getVariantOptionsTableName()); + queryInput.setFilter(new QQueryFilter(new QFilterCriteria(backendMetaData.getVariantOptionsTableTypeField(), QCriteriaOperator.EQUALS, backendMetaData.getVariantOptionsTableTypeValue()))); + + QueryOutput queryOutput = new QueryAction().execute(queryInput); + records = queryOutput.getRecords(); + } + catch(Exception e) + { + LOG.error("An error fetching variant data for process [" + processMetaData.getLabel() + "]", e); + } + + return (records); + } + +}