mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-20 22:18:43 +00:00
CE-847 Main implementation of fix for missing insert automations, by updating status to pending-update-automations when a record is still pending insert automations. added memoization of areThereTableTriggersForTableMemoization; small cleanup (remove session & instance params, pass transaction
This commit is contained in:
@ -22,30 +22,41 @@
|
||||
package com.kingsrook.qqq.backend.core.actions.automation;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.CountAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
|
||||
import com.kingsrook.qqq.backend.core.context.QContext;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.instances.QMetaDataVariableInterpreter;
|
||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountOutput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
|
||||
import com.kingsrook.qqq.backend.core.model.automation.TableTrigger;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTrackingType;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TableAutomationAction;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.TriggerEvent;
|
||||
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
||||
import com.kingsrook.qqq.backend.core.utils.Speaker;
|
||||
import com.kingsrook.qqq.backend.core.utils.memoization.Memoization;
|
||||
import org.apache.commons.lang.NotImplementedException;
|
||||
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -55,19 +66,37 @@ public class RecordAutomationStatusUpdater
|
||||
{
|
||||
private static final QLogger LOG = QLogger.getLogger(RecordAutomationStatusUpdater.class);
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// feature flag - by default, will be false - and before setting records to PENDING_UPDATE_AUTOMATIONS, //
|
||||
// we will fetch them, to check their current automationStatus - and if they are currently PENDING //
|
||||
// or RUNNING inserts or updates, we won't update them. This is added to fix cases where an update that //
|
||||
// comes in before insert-automations have run, will cause the pending-insert status to be missed. //
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
private static boolean skipPreUpdateFetch = new QMetaDataVariableInterpreter().getBooleanFromPropertyOrEnvironment("qqq.recordAutomationStatusUpdater.skipPreUpdateFetch", "QQQ_RECORD_AUTOMATION_STATUS_UPDATER_SKIP_PRE_UPDATE_FETCH", false);
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// feature flag - by default, we'll memoize the check for triggers - but we can turn it off. //
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////
|
||||
private static boolean memoizeCheckForTriggers = new QMetaDataVariableInterpreter().getBooleanFromPropertyOrEnvironment("qqq.recordAutomationStatusUpdater.memoizeCheckForTriggers", "QQQ_RECORD_AUTOMATION_STATUS_UPDATER_MEMOIZE_CHECK_FOR_TRIGGERS", true);
|
||||
|
||||
private static Memoization<Key, Boolean> areThereTableTriggersForTableMemoization = new Memoization<Key, Boolean>().withTimeout(Duration.of(60, ChronoUnit.SECONDS));
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** for a list of records from a table, set their automation status - based on
|
||||
** how the table is configured.
|
||||
*******************************************************************************/
|
||||
public static boolean setAutomationStatusInRecords(QSession session, QTableMetaData table, List<QRecord> records, AutomationStatus automationStatus)
|
||||
public static boolean setAutomationStatusInRecords(QTableMetaData table, List<QRecord> records, AutomationStatus automationStatus, QBackendTransaction transaction)
|
||||
{
|
||||
if(table == null || table.getAutomationDetails() == null || CollectionUtils.nullSafeIsEmpty(records))
|
||||
{
|
||||
return (false);
|
||||
}
|
||||
|
||||
QTableAutomationDetails automationDetails = table.getAutomationDetails();
|
||||
Set<Serializable> pkeysWeMayNotUpdate = new HashSet<>();
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// In case an automation is running, and it updates records - don't let those records be marked //
|
||||
// as PENDING_UPDATE_AUTOMATIONS... this is meant to avoid having a record's automation update //
|
||||
@ -81,12 +110,50 @@ public class RecordAutomationStatusUpdater
|
||||
for(StackTraceElement stackTraceElement : e.getStackTrace())
|
||||
{
|
||||
String className = stackTraceElement.getClassName();
|
||||
if(className.contains("com.kingsrook.qqq.backend.core.actions.automation") && !className.equals(RecordAutomationStatusUpdater.class.getName()) && !className.endsWith("Test"))
|
||||
if(className.contains(RecordAutomationStatusUpdater.class.getPackageName()) && !className.equals(RecordAutomationStatusUpdater.class.getName()) && !className.endsWith("Test") && !className.contains("Test$"))
|
||||
{
|
||||
LOG.debug("Avoiding re-setting automation status to PENDING_UPDATE while running an automation");
|
||||
return (false);
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
// if table uses field-in-table status tracking (and feature flag allows it) //
|
||||
// then look the records up before we set them to pending-updates, to avoid //
|
||||
// losing other pending or running status information. We will allow moving //
|
||||
// from OK or the 2 failed statuses into pending-updates - which seems right. //
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
if(automationDetails.getStatusTracking() != null && AutomationStatusTrackingType.FIELD_IN_TABLE.equals(automationDetails.getStatusTracking().getType()) && !skipPreUpdateFetch)
|
||||
{
|
||||
try
|
||||
{
|
||||
List<Serializable> pkeysToLookup = records.stream().map(r -> r.getValue(table.getPrimaryKeyField())).toList();
|
||||
|
||||
List<QRecord> freshRecords = new QueryAction().execute(new QueryInput(table.getName())
|
||||
.withFilter(new QQueryFilter(new QFilterCriteria(table.getPrimaryKeyField(), QCriteriaOperator.IN, pkeysToLookup)))
|
||||
.withTransaction(transaction)
|
||||
).getRecords();
|
||||
|
||||
for(QRecord freshRecord : freshRecords)
|
||||
{
|
||||
Serializable recordStatus = freshRecord.getValue(automationDetails.getStatusTracking().getFieldName());
|
||||
if(AutomationStatus.PENDING_INSERT_AUTOMATIONS.getId().equals(recordStatus)
|
||||
|| AutomationStatus.PENDING_UPDATE_AUTOMATIONS.getId().equals(recordStatus)
|
||||
|| AutomationStatus.RUNNING_INSERT_AUTOMATIONS.getId().equals(recordStatus)
|
||||
|| AutomationStatus.RUNNING_UPDATE_AUTOMATIONS.getId().equals(recordStatus))
|
||||
{
|
||||
Serializable primaryKey = freshRecord.getValue(table.getPrimaryKeyField());
|
||||
LOG.debug("May not update automation status", logPair("table", table.getName()), logPair("id", primaryKey), logPair("currentStatus", recordStatus), logPair("requestedStatus", automationStatus.getId()));
|
||||
pkeysWeMayNotUpdate.add(primaryKey);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch(QException qe)
|
||||
{
|
||||
LOG.error("Error checking existing automation status before setting new automation status - more records will be updated than maybe should be...", qe);
|
||||
Speaker.say("Avoid re-set todd update during automation");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
@ -98,19 +165,15 @@ public class RecordAutomationStatusUpdater
|
||||
automationStatus = AutomationStatus.OK;
|
||||
}
|
||||
|
||||
QTableAutomationDetails automationDetails = table.getAutomationDetails();
|
||||
if(automationDetails.getStatusTracking() != null && AutomationStatusTrackingType.FIELD_IN_TABLE.equals(automationDetails.getStatusTracking().getType()))
|
||||
{
|
||||
for(QRecord record : records)
|
||||
{
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// todo - seems like there's some case here, where if an order was in PENDING_INSERT, but then some other job updated the record, that we'd //
|
||||
// lose that pending status, which would be a Bad Thing™... //
|
||||
// problem is - we may not have the full record in here, so we can't necessarily check the record to see what status it's currently in... //
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
record.setValue(automationDetails.getStatusTracking().getFieldName(), automationStatus.getId());
|
||||
// todo - another field - for the automation timestamp??
|
||||
if(!pkeysWeMayNotUpdate.contains(record.getValue(table.getPrimaryKeyField())))
|
||||
{
|
||||
record.setValue(automationDetails.getStatusTracking().getFieldName(), automationStatus.getId());
|
||||
// todo - another field - for the automation timestamp??
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -188,11 +251,29 @@ public class RecordAutomationStatusUpdater
|
||||
return (false);
|
||||
}
|
||||
|
||||
if(memoizeCheckForTriggers)
|
||||
{
|
||||
///////////////////////////////////////////////////////////////////////////////////////
|
||||
// as within the lookup method, error on the side of "yes, maybe there are triggers" //
|
||||
///////////////////////////////////////////////////////////////////////////////////////
|
||||
Optional<Boolean> result = areThereTableTriggersForTableMemoization.getResult(new Key(table, triggerEvent), key -> lookupIfThereAreTriggersForTable(table, triggerEvent));
|
||||
return result.orElse(true);
|
||||
}
|
||||
else
|
||||
{
|
||||
return lookupIfThereAreTriggersForTable(table, triggerEvent);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
private static Boolean lookupIfThereAreTriggersForTable(QTableMetaData table, TriggerEvent triggerEvent)
|
||||
{
|
||||
try
|
||||
{
|
||||
///////////////////
|
||||
// todo - cache? //
|
||||
///////////////////
|
||||
CountInput countInput = new CountInput();
|
||||
countInput.setTableName(TableTrigger.TABLE_NAME);
|
||||
countInput.setFilter(new QQueryFilter(
|
||||
@ -207,6 +288,7 @@ public class RecordAutomationStatusUpdater
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// if the count query failed, we're a bit safer to err on the side of "yeah, there might be automations" //
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
LOG.warn("Error looking if there are triggers for table", e, logPair("tableName", table.getName()));
|
||||
return (true);
|
||||
}
|
||||
}
|
||||
@ -217,12 +299,12 @@ public class RecordAutomationStatusUpdater
|
||||
** for a list of records, update their automation status and actually Update the
|
||||
** backend as well.
|
||||
*******************************************************************************/
|
||||
public static void setAutomationStatusInRecordsAndUpdate(QInstance instance, QSession session, QTableMetaData table, List<QRecord> records, AutomationStatus automationStatus) throws QException
|
||||
public static void setAutomationStatusInRecordsAndUpdate(QTableMetaData table, List<QRecord> records, AutomationStatus automationStatus, QBackendTransaction transaction) throws QException
|
||||
{
|
||||
QTableAutomationDetails automationDetails = table.getAutomationDetails();
|
||||
if(automationDetails != null && AutomationStatusTrackingType.FIELD_IN_TABLE.equals(automationDetails.getStatusTracking().getType()))
|
||||
{
|
||||
boolean didSetStatusField = setAutomationStatusInRecords(session, table, records, automationStatus);
|
||||
boolean didSetStatusField = setAutomationStatusInRecords(table, records, automationStatus, transaction);
|
||||
if(didSetStatusField)
|
||||
{
|
||||
UpdateInput updateInput = new UpdateInput();
|
||||
@ -237,6 +319,7 @@ public class RecordAutomationStatusUpdater
|
||||
.withValue(table.getPrimaryKeyField(), r.getValue(table.getPrimaryKeyField()))
|
||||
.withValue(automationDetails.getStatusTracking().getFieldName(), r.getValue(automationDetails.getStatusTracking().getFieldName()))).toList());
|
||||
updateInput.setAreAllValuesBeingUpdatedTheSame(true);
|
||||
updateInput.setTransaction(transaction);
|
||||
updateInput.setOmitDmlAudit(true);
|
||||
|
||||
new UpdateAction().execute(updateInput);
|
||||
@ -250,4 +333,8 @@ public class RecordAutomationStatusUpdater
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private record Key(QTableMetaData table, TriggerEvent triggerEvent) {}
|
||||
|
||||
}
|
||||
|
@ -251,8 +251,7 @@ public class PollingAutomationPerTableRunner implements Runnable
|
||||
|
||||
try
|
||||
{
|
||||
QSession session = sessionSupplier != null ? sessionSupplier.get() : new QSession();
|
||||
processTableInsertOrUpdate(instance.getTable(tableActions.tableName()), session, tableActions.status());
|
||||
processTableInsertOrUpdate(instance.getTable(tableActions.tableName()), tableActions.status());
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
@ -270,7 +269,7 @@ public class PollingAutomationPerTableRunner implements Runnable
|
||||
/*******************************************************************************
|
||||
** Query for and process records that have a PENDING_INSERT or PENDING_UPDATE status on a given table.
|
||||
*******************************************************************************/
|
||||
public void processTableInsertOrUpdate(QTableMetaData table, QSession session, AutomationStatus automationStatus) throws QException
|
||||
public void processTableInsertOrUpdate(QTableMetaData table, AutomationStatus automationStatus) throws QException
|
||||
{
|
||||
/////////////////////////////////////////////////////////////////////////
|
||||
// get the actions to run against this table in this automation status //
|
||||
@ -321,7 +320,7 @@ public class PollingAutomationPerTableRunner implements Runnable
|
||||
}, () ->
|
||||
{
|
||||
List<QRecord> records = recordPipe.consumeAvailableRecords();
|
||||
applyActionsToRecords(session, table, records, actions, automationStatus);
|
||||
applyActionsToRecords(table, records, actions, automationStatus);
|
||||
return (records.size());
|
||||
}
|
||||
);
|
||||
@ -427,7 +426,7 @@ public class PollingAutomationPerTableRunner implements Runnable
|
||||
** table's actions against them - IF they are found to match the action's filter
|
||||
** (assuming it has one - if it doesn't, then all records match).
|
||||
*******************************************************************************/
|
||||
private void applyActionsToRecords(QSession session, QTableMetaData table, List<QRecord> records, List<TableAutomationAction> actions, AutomationStatus automationStatus) throws QException
|
||||
private void applyActionsToRecords(QTableMetaData table, List<QRecord> records, List<TableAutomationAction> actions, AutomationStatus automationStatus) throws QException
|
||||
{
|
||||
if(CollectionUtils.nullSafeIsEmpty(records))
|
||||
{
|
||||
@ -437,7 +436,7 @@ public class PollingAutomationPerTableRunner implements Runnable
|
||||
///////////////////////////////////////////////////
|
||||
// mark the records as RUNNING their automations //
|
||||
///////////////////////////////////////////////////
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(instance, session, table, records, pendingToRunningStatusMap.get(automationStatus));
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(table, records, pendingToRunningStatusMap.get(automationStatus), null);
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// foreach action - run it against the records (but only if they match the action's filter, if there is one) //
|
||||
@ -457,11 +456,11 @@ public class PollingAutomationPerTableRunner implements Runnable
|
||||
////////////////////////////////////////
|
||||
if(anyActionsFailed)
|
||||
{
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(instance, session, table, records, pendingToFailedStatusMap.get(automationStatus));
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(table, records, pendingToFailedStatusMap.get(automationStatus), null);
|
||||
}
|
||||
else
|
||||
{
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(instance, session, table, records, AutomationStatus.OK);
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecordsAndUpdate(table, records, AutomationStatus.OK, null);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -444,7 +444,7 @@ public class InsertAction extends AbstractQActionFunction<InsertInput, InsertOut
|
||||
*******************************************************************************/
|
||||
private void setAutomationStatusField(InsertInput insertInput)
|
||||
{
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecords(insertInput.getSession(), insertInput.getTable(), insertInput.getRecords(), AutomationStatus.PENDING_INSERT_AUTOMATIONS);
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecords(insertInput.getTable(), insertInput.getRecords(), AutomationStatus.PENDING_INSERT_AUTOMATIONS, insertInput.getTransaction());
|
||||
}
|
||||
|
||||
|
||||
|
@ -563,7 +563,7 @@ public class UpdateAction
|
||||
*******************************************************************************/
|
||||
private void setAutomationStatusField(UpdateInput updateInput)
|
||||
{
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecords(updateInput.getSession(), updateInput.getTable(), updateInput.getRecords(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS);
|
||||
RecordAutomationStatusUpdater.setAutomationStatusInRecords(updateInput.getTable(), updateInput.getRecords(), AutomationStatus.PENDING_UPDATE_AUTOMATIONS, updateInput.getTransaction());
|
||||
}
|
||||
|
||||
}
|
||||
|
Reference in New Issue
Block a user