Initial checkin of process & table customizer to help sync scheduled jobs for records in a table

This commit is contained in:
2025-06-12 15:11:14 -05:00
parent ffca465f04
commit 5db8cf9ca1
4 changed files with 849 additions and 0 deletions

View File

@ -0,0 +1,245 @@
/*
* Copyright © 2022-2024. ColdTrack <contact@coldtrack.com>. All Rights Reserved.
*/
package com.kingsrook.qqq.backend.core.scheduler.processes;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.MetaDataProducerInterface;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QSchedulerMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJob;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJobParameter;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJobType;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.StreamedETLWithFrontendProcess;
import com.kingsrook.qqq.backend.core.processes.implementations.tablesync.AbstractTableSyncTransformStep;
import com.kingsrook.qqq.backend.core.processes.implementations.tablesync.TableSyncProcess;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import com.kingsrook.qqq.backend.core.utils.collections.ListBuilder;
/*******************************************************************************
** Base class to manage creating scheduled jobs based on records in another table
**
** Expected to be used via BaseSyncToScheduledJobTableCustomizer - see its javadoc.
**
*******************************************************************************/
public abstract class AbstractRecordSyncToScheduledJobProcess extends AbstractTableSyncTransformStep implements MetaDataProducerInterface<QProcessMetaData>
{
private static final QLogger LOG = QLogger.getLogger(AbstractRecordSyncToScheduledJobProcess.class);
public static final String SCHEDULER_NAME_FIELD_NAME = "schedulerName";
/*******************************************************************************
**
*******************************************************************************/
@Override
public QProcessMetaData produce(QInstance qInstance) throws QException
{
QProcessMetaData processMetaData = TableSyncProcess.processMetaDataBuilder(false)
.withName(getClass().getSimpleName())
.withSyncTransformStepClass(getClass())
.withReviewStepRecordFields(List.of(
new QFieldMetaData(getRecordForeignKeyFieldName(), QFieldType.INTEGER).withPossibleValueSourceName(getRecordForeignKeyPossibleValueSourceName()),
new QFieldMetaData("cronExpression", QFieldType.STRING),
new QFieldMetaData("isActive", QFieldType.BOOLEAN)
))
.getProcessMetaData();
processMetaData.getBackendStep(StreamedETLWithFrontendProcess.STEP_NAME_PREVIEW).getInputMetaData()
.withField(new QFieldMetaData(SCHEDULER_NAME_FIELD_NAME, QFieldType.STRING));
return (processMetaData);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public QRecord populateRecordToStore(RunBackendStepInput runBackendStepInput, QRecord destinationRecord, QRecord sourceRecord) throws QException
{
ScheduledJob scheduledJob;
if(destinationRecord == null || destinationRecord.getValue("id") == null)
{
QInstance qInstance = QContext.getQInstance();
////////////////////////////////////////////////////////////////
// this is the table at which the scheduled job will point to //
////////////////////////////////////////////////////////////////
QTableMetaData sourceTableMetaData = qInstance.getTable(getSourceTableName());
String sourceTableId = String.valueOf(sourceRecord.getValueString(sourceTableMetaData.getPrimaryKeyField()));
String sourceTableJobKey = getSourceTableName() + "Id";
///////////////////////////////////////////////////////////
// this is the table that the scheduled record points to //
///////////////////////////////////////////////////////////
QTableMetaData recordForeignTableMetaData = qInstance.getTable(getRecordForeignKeyPossibleValueSourceName());
String sourceRecordForeignKeyId = sourceRecord.getValueString(getRecordForeignKeyFieldName());
////////////////////////////////////////////////////////////////////////
// need to do an insert - set lots of key values in the scheduled job //
////////////////////////////////////////////////////////////////////////
scheduledJob = new ScheduledJob();
scheduledJob.setSchedulerName(runBackendStepInput.getValueString(SCHEDULER_NAME_FIELD_NAME));
scheduledJob.setType(ScheduledJobType.PROCESS.name());
scheduledJob.setForeignKeyType(getSourceTableName());
scheduledJob.setForeignKeyValue(sourceTableId);
scheduledJob.setJobParameters(ListBuilder.of(
new ScheduledJobParameter().withKey("isScheduledJob").withValue("true"),
new ScheduledJobParameter().withKey("processName").withValue(getProcessNameScheduledJobParameter()),
new ScheduledJobParameter().withKey(sourceTableJobKey).withValue(sourceTableId),
new ScheduledJobParameter().withKey("recordId").withValue(ValueUtils.getValueAsString(sourceRecordForeignKeyId))
));
//////////////////////////////////////////////////////////////////////////
// make a call to allow subclasses to customize parts of the job record //
//////////////////////////////////////////////////////////////////////////
scheduledJob.setLabel(recordForeignTableMetaData.getLabel() + " " + sourceRecordForeignKeyId);
scheduledJob.setDescription("Job to run " + sourceTableMetaData.getLabel() + " Id " + sourceTableId
+ " (which runs for " + recordForeignTableMetaData.getLabel() + " Id " + sourceRecordForeignKeyId + ")");
}
else
{
//////////////////////////////////////////////////////////////////////////////////
// else doing an update - populate scheduled job entity from destination record //
//////////////////////////////////////////////////////////////////////////////////
scheduledJob = new ScheduledJob(destinationRecord);
}
//////////////////////////////////////////////////////////////////////////////////
// these fields sync on insert and update //
// todo - if no diffs, should we return null (to avoid changing quartz at all?) //
//////////////////////////////////////////////////////////////////////////////////
scheduledJob.setCronExpression(sourceRecord.getValueString("cronExpression"));
scheduledJob.setCronTimeZoneId(sourceRecord.getValueString("cronTimeZoneId"));
scheduledJob.setIsActive(true);
scheduledJob = customizeScheduledJob(scheduledJob, sourceRecord);
////////////////////////////////////////////////////////////////////
// try to make sure scheduler name is set (and fail if it isn't!) //
////////////////////////////////////////////////////////////////////
makeSureSchedulerNameIsSet(scheduledJob);
return scheduledJob.toQRecord();
}
/***************************************************************************
**
***************************************************************************/
protected void makeSureSchedulerNameIsSet(ScheduledJob scheduledJob) throws QException
{
if(!StringUtils.hasContent(scheduledJob.getSchedulerName()))
{
Map<String, QSchedulerMetaData> schedulers = QContext.getQInstance().getSchedulers();
if(schedulers.size() == 1)
{
scheduledJob.setSchedulerName(schedulers.keySet().iterator().next());
}
}
if(!StringUtils.hasContent(scheduledJob.getSchedulerName()))
{
String message = "Could not determine scheduler name for webhook scheduled job.";
LOG.warn(message);
throw (new QException(message));
}
}
/*******************************************************************************
**
*******************************************************************************/
protected ScheduledJob customizeScheduledJob(ScheduledJob scheduledJob, QRecord sourceRecord) throws QException
{
return (scheduledJob);
}
/*******************************************************************************
**
*******************************************************************************/
@Override
protected QQueryFilter getExistingRecordQueryFilter(RunBackendStepInput runBackendStepInput, List<Serializable> sourceKeyList)
{
return super.getExistingRecordQueryFilter(runBackendStepInput, sourceKeyList)
.withCriteria(new QFilterCriteria("foreignKeyType", QCriteriaOperator.EQUALS, getScheduledJobForeignKeyType()));
}
/*******************************************************************************
**
*******************************************************************************/
@Override
protected SyncProcessConfig getSyncProcessConfig()
{
return new SyncProcessConfig(getSourceTableName(), getSourceTableKeyField(), ScheduledJob.TABLE_NAME, "foreignKeyValue", true, true);
}
/*******************************************************************************
**
*******************************************************************************/
protected abstract String getScheduledJobForeignKeyType();
/*******************************************************************************
**
*******************************************************************************/
protected abstract String getRecordForeignKeyFieldName();
/*******************************************************************************
**
*******************************************************************************/
protected abstract String getRecordForeignKeyPossibleValueSourceName();
/*******************************************************************************
**
*******************************************************************************/
protected abstract String getSourceTableName();
/*******************************************************************************
**
*******************************************************************************/
protected abstract String getProcessNameScheduledJobParameter();
/*******************************************************************************
**
*******************************************************************************/
protected String getSourceTableKeyField()
{
return ("id");
}
}

View File

@ -0,0 +1,337 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2025. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.scheduler.processes;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizerInterface;
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizers;
import com.kingsrook.qqq.backend.core.actions.processes.QProcessCallbackFactory;
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
import com.kingsrook.qqq.backend.core.actions.tables.DeleteAction;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.actions.AbstractActionInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.code.InitializableViaCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReferenceWithProperties;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.scheduledjobs.ScheduledJob;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.StreamedETLWithFrontendProcess;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
/*******************************************************************************
** an implementation of a TableCustomizer that runs a subclass of
** AbstractRecordSyncToScheduledJobProcess - to manage scheduledJob records that
** correspond to records in another table (e.g., a job for each Client)
**
** Easiest way to use is:
** - BaseSyncToScheduledJobTableCustomizer.setTableCustomizers(tableMetaData, new YourSyncScheduledJobProcessSubclass());
** which adds post-insert, -update, and -delete customizers to your table.
**
** If you need additional table customizer code in those slots, I suppose you could
** simply make your customizer create an instance of this class, set its
** properties, and run its appropriate postInsertOrUpdate/postDelete methods.
*******************************************************************************/
public class BaseSyncToScheduledJobTableCustomizer implements TableCustomizerInterface, InitializableViaCodeReference
{
private static final QLogger LOG = QLogger.getLogger(BaseSyncToScheduledJobTableCustomizer.class);
public static final String KEY_TABLE_NAME = "tableName";
public static final String KEY_SYNC_PROCESS_NAME = "syncProcessName";
public static final String KEY_SCHEDULED_JOB_FOREIGN_KEY_TYPE = "scheduledJobForeignKeyType";
private String tableName;
private String syncProcessName;
private String scheduledJobForeignKeyType;
/***************************************************************************
**
***************************************************************************/
public static void setTableCustomizers(QTableMetaData tableMetaData, AbstractRecordSyncToScheduledJobProcess syncProcess)
{
QCodeReference codeReference = new QCodeReferenceWithProperties(BaseSyncToScheduledJobTableCustomizer.class, Map.of(
KEY_TABLE_NAME, tableMetaData.getName(),
KEY_SYNC_PROCESS_NAME, syncProcess.getClass().getSimpleName(),
KEY_SCHEDULED_JOB_FOREIGN_KEY_TYPE, syncProcess.getScheduledJobForeignKeyType()
));
tableMetaData.withCustomizer(TableCustomizers.POST_INSERT_RECORD, codeReference);
tableMetaData.withCustomizer(TableCustomizers.POST_UPDATE_RECORD, codeReference);
tableMetaData.withCustomizer(TableCustomizers.POST_DELETE_RECORD, codeReference);
}
/***************************************************************************
**
***************************************************************************/
@Override
public void initialize(QCodeReference codeReference)
{
if(codeReference instanceof QCodeReferenceWithProperties codeReferenceWithProperties)
{
tableName = ValueUtils.getValueAsString(codeReferenceWithProperties.getProperties().get(KEY_TABLE_NAME));
syncProcessName = ValueUtils.getValueAsString(codeReferenceWithProperties.getProperties().get(KEY_SYNC_PROCESS_NAME));
scheduledJobForeignKeyType = ValueUtils.getValueAsString(codeReferenceWithProperties.getProperties().get(KEY_SCHEDULED_JOB_FOREIGN_KEY_TYPE));
if(!StringUtils.hasContent(tableName))
{
LOG.warn("Missing property under KEY_TABLE_NAME [" + KEY_TABLE_NAME + "] in codeReference for BaseSyncToScheduledJobTableCustomizer");
}
if(!StringUtils.hasContent(syncProcessName))
{
LOG.warn("Missing property under KEY_SYNC_PROCESS_NAME [" + KEY_SYNC_PROCESS_NAME + "] in codeReference for BaseSyncToScheduledJobTableCustomizer");
}
if(!StringUtils.hasContent(scheduledJobForeignKeyType))
{
LOG.warn("Missing property under KEY_SCHEDULED_JOB_FOREIGN_KEY_TYPE [" + KEY_SCHEDULED_JOB_FOREIGN_KEY_TYPE + "] in codeReference for BaseSyncToScheduledJobTableCustomizer");
}
}
}
/***************************************************************************
**
***************************************************************************/
@Override
public List<QRecord> postInsertOrUpdate(AbstractActionInput input, List<QRecord> records, Optional<List<QRecord>> oldRecordList) throws QException
{
runSyncProcessForRecordList(records, syncProcessName);
return records;
}
/***************************************************************************
**
***************************************************************************/
@Override
public List<QRecord> postDelete(DeleteInput deleteInput, List<QRecord> records) throws QException
{
deleteScheduledJobsForRecordList(records);
return records;
}
/***************************************************************************
**
***************************************************************************/
public void runSyncProcessForRecordList(List<QRecord> records, String processName)
{
if(QContext.getQInstance().getTable(ScheduledJob.TABLE_NAME) == null)
{
LOG.info("ScheduledJob table not found, skipping scheduled job sync.");
}
String primaryKeyField = QContext.getQInstance().getTable(tableName).getPrimaryKeyField();
List<Serializable> sourceRecordIds = records.stream()
.filter(r -> CollectionUtils.nullSafeIsEmpty(r.getErrors()))
.map(r -> r.getValue(primaryKeyField))
.filter(Objects::nonNull).toList();
if(CollectionUtils.nullSafeIsEmpty(sourceRecordIds))
{
return;
}
try
{
RunProcessInput runProcessInput = new RunProcessInput();
runProcessInput.setProcessName(processName);
runProcessInput.setCallback(QProcessCallbackFactory.forPrimaryKeys("id", sourceRecordIds));
runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
RunProcessOutput runProcessOutput = new RunProcessAction().execute(runProcessInput);
Serializable processSummary = runProcessOutput.getValue(StreamedETLWithFrontendProcess.FIELD_PROCESS_SUMMARY);
ProcessSummaryLineInterface.log("Sync to ScheduledJob Process Summary", processSummary, List.of(logPair("sourceTable", tableName)));
}
catch(Exception e)
{
LOG.warn("Error syncing records to scheduled jobs", e, logPair("sourceTable", tableName), logPair("sourceRecordIds", sourceRecordIds));
}
}
/***************************************************************************
**
***************************************************************************/
public void deleteScheduledJobsForRecordList(List<QRecord> records)
{
if(QContext.getQInstance().getTable(ScheduledJob.TABLE_NAME) == null)
{
LOG.info("ScheduledJob table not found, skipping scheduled job delete.");
}
List<String> sourceRecordIds = records.stream()
.filter(r -> CollectionUtils.nullSafeIsEmpty(r.getErrors()))
.map(r -> r.getValueString("id")).toList();
if(sourceRecordIds.isEmpty())
{
return;
}
///////////////////////////////////////////////////
// delete any corresponding scheduledJob records //
///////////////////////////////////////////////////
try
{
new DeleteAction().execute(new DeleteInput(ScheduledJob.TABLE_NAME).withQueryFilter(new QQueryFilter()
.withCriteria(new QFilterCriteria("foreignKeyType", QCriteriaOperator.EQUALS, getScheduledJobForeignKeyType()))
.withCriteria(new QFilterCriteria("foreignKeyValue", QCriteriaOperator.IN, sourceRecordIds))));
}
catch(Exception e)
{
LOG.warn("Error deleting scheduled jobs for scheduled records", e, logPair("sourceTable", tableName), logPair("sourceRecordIds", sourceRecordIds));
}
}
/*******************************************************************************
** Getter for tableName
*******************************************************************************/
public String getTableName()
{
return (this.tableName);
}
/*******************************************************************************
** Setter for tableName
*******************************************************************************/
public void setTableName(String tableName)
{
this.tableName = tableName;
}
/*******************************************************************************
** Fluent setter for tableName
*******************************************************************************/
public BaseSyncToScheduledJobTableCustomizer withTableName(String tableName)
{
this.tableName = tableName;
return (this);
}
/*******************************************************************************
** Getter for syncProcessName
*******************************************************************************/
public String getSyncProcessName()
{
return (this.syncProcessName);
}
/*******************************************************************************
** Setter for syncProcessName
*******************************************************************************/
public void setSyncProcessName(String syncProcessName)
{
this.syncProcessName = syncProcessName;
}
/*******************************************************************************
** Fluent setter for syncProcessName
*******************************************************************************/
public BaseSyncToScheduledJobTableCustomizer withSyncProcessName(String syncProcessName)
{
this.syncProcessName = syncProcessName;
return (this);
}
/*******************************************************************************
** Getter for KEY_SCHEDULED_JOB_FOREIGN_KEY_TYPE
*******************************************************************************/
public String getKEY_SCHEDULED_JOB_FOREIGN_KEY_TYPE()
{
return (BaseSyncToScheduledJobTableCustomizer.KEY_SCHEDULED_JOB_FOREIGN_KEY_TYPE);
}
/*******************************************************************************
** Getter for scheduledJobForeignKeyType
*******************************************************************************/
public String getScheduledJobForeignKeyType()
{
return (this.scheduledJobForeignKeyType);
}
/*******************************************************************************
** Setter for scheduledJobForeignKeyType
*******************************************************************************/
public void setScheduledJobForeignKeyType(String scheduledJobForeignKeyType)
{
this.scheduledJobForeignKeyType = scheduledJobForeignKeyType;
}
/*******************************************************************************
** Fluent setter for scheduledJobForeignKeyType
*******************************************************************************/
public BaseSyncToScheduledJobTableCustomizer withScheduledJobForeignKeyType(String scheduledJobForeignKeyType)
{
this.scheduledJobForeignKeyType = scheduledJobForeignKeyType;
return (this);
}
}