Add TableSyncProcess

This commit is contained in:
2022-11-11 12:28:44 -06:00
parent a5ec33b51b
commit 8b31cee890
20 changed files with 1138 additions and 29 deletions

View File

@ -414,6 +414,9 @@ public class QPossibleValueTranslator
QueryOutput queryOutput = new QueryAction().execute(queryInput);
///////////////////////////////////////////////////////////////////////////////////
// for all records that were found, put a formatted value into cache foreach PVS //
///////////////////////////////////////////////////////////////////////////////////
for(QRecord record : queryOutput.getRecords())
{
Serializable pkeyValue = record.getValue(primaryKeyField);
@ -423,6 +426,20 @@ public class QPossibleValueTranslator
possibleValueCache.get(possibleValueSource.getName()).put(pkeyValue, formatPossibleValue(possibleValueSource, possibleValue));
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////
// for all pkeys that were NOT found, put a null value into cache foreach PVS (to avoid re-looking up) //
/////////////////////////////////////////////////////////////////////////////////////////////////////////
for(Serializable pkey : page)
{
for(QPossibleValueSource possibleValueSource : possibleValueSources)
{
if(!possibleValueCache.get(possibleValueSource.getName()).containsKey(pkey))
{
possibleValueCache.get(possibleValueSource.getName()).put(pkey, null);
}
}
}
}
}
catch(Exception e)

View File

@ -36,11 +36,13 @@ import java.util.stream.Stream;
import com.kingsrook.qqq.backend.core.actions.automation.RecordAutomationHandler;
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizers;
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
import com.kingsrook.qqq.backend.core.actions.scripts.TestScriptActionInterface;
import com.kingsrook.qqq.backend.core.actions.values.QCustomPossibleValueProvider;
import com.kingsrook.qqq.backend.core.exceptions.QInstanceValidationException;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterOrderBy;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.metadata.QBackendMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeType;
@ -55,6 +57,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.reporting.QReportDataSource;
import com.kingsrook.qqq.backend.core.model.metadata.reporting.QReportView;
import com.kingsrook.qqq.backend.core.model.metadata.tables.AssociatedScript;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QFieldSection;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.Tier;
@ -222,24 +225,34 @@ public class QInstanceValidator
*******************************************************************************/
private void validateTables(QInstance qInstance)
{
if(assertCondition(CollectionUtils.nullSafeHasContents(qInstance.getTables()),
"At least 1 table must be defined."))
if(assertCondition(CollectionUtils.nullSafeHasContents(qInstance.getTables()), "At least 1 table must be defined."))
{
qInstance.getTables().forEach((tableName, table) ->
{
assertCondition(Objects.equals(tableName, table.getName()), "Inconsistent naming for table: " + tableName + "/" + table.getName() + ".");
validateAppChildHasValidParentAppName(qInstance, table);
////////////////////////////////////////
// validate the backend for the table //
////////////////////////////////////////
if(assertCondition(StringUtils.hasContent(table.getBackendName()),
"Missing backend name for table " + tableName + "."))
if(assertCondition(StringUtils.hasContent(table.getBackendName()), "Missing backend name for table " + tableName + "."))
{
if(CollectionUtils.nullSafeHasContents(qInstance.getBackends()))
{
assertCondition(qInstance.getBackendForTable(tableName) != null, "Unrecognized backend " + table.getBackendName() + " for table " + tableName + ".");
QBackendMetaData backendForTable = qInstance.getBackendForTable(tableName);
if(assertCondition(backendForTable != null, "Unrecognized backend " + table.getBackendName() + " for table " + tableName + "."))
{
////////////////////////////////////////////////////////////
// if the backend requires primary keys, then validate it //
////////////////////////////////////////////////////////////
if(backendForTable.requiresPrimaryKeyOnTables())
{
if(assertCondition(StringUtils.hasContent(table.getPrimaryKeyField()), "Missing primary key for table: " + tableName))
{
assertNoException(() -> table.getField(table.getPrimaryKeyField()), "Primary key for table " + tableName + " is not a recognized field on this table.");
}
}
}
}
}
@ -329,12 +342,49 @@ public class QInstanceValidator
{
validateTableUniqueKeys(table);
}
/////////////////////////////////////////////
// validate the table's associated scripts //
/////////////////////////////////////////////
if(table.getAssociatedScripts() != null)
{
validateAssociatedScripts(table);
}
});
}
}
/*******************************************************************************
**
*******************************************************************************/
private void validateAssociatedScripts(QTableMetaData table)
{
Set<String> usedFieldNames = new HashSet<>();
for(AssociatedScript associatedScript : table.getAssociatedScripts())
{
if(assertCondition(StringUtils.hasContent(associatedScript.getFieldName()), "Table " + table.getName() + " has an associatedScript without a fieldName"))
{
assertCondition(!usedFieldNames.contains(associatedScript.getFieldName()), "Table " + table.getName() + " has more than one associatedScript specifying field: " + associatedScript.getFieldName());
usedFieldNames.add(associatedScript.getFieldName());
assertNoException(() -> table.getField(associatedScript.getFieldName()), "Table " + table.getName() + " has an associatedScript specifying an unrecognized field: " + associatedScript.getFieldName());
}
assertCondition(associatedScript.getScriptTypeId() != null, "Table " + table.getName() + " associatedScript on field " + associatedScript.getFieldName() + " is missing a scriptTypeId");
if(associatedScript.getScriptTester() != null)
{
String prefix = "Table " + table.getName() + " associatedScript on field " + associatedScript.getFieldName();
if(preAssertionsForCodeReference(associatedScript.getScriptTester(), prefix))
{
validateSimpleCodeReference(prefix, associatedScript.getScriptTester(), TestScriptActionInterface.class);
}
}
}
}
/*******************************************************************************
**
*******************************************************************************/

View File

@ -106,6 +106,7 @@ public class RunBackendStepInput extends AbstractActionInput
target.setTableName(getTableName());
target.setProcessName(getProcessName());
target.setAsyncJobCallback(getAsyncJobCallback());
target.setFrontendStepBehavior(getFrontendStepBehavior());
target.setValues(getValues());
}

View File

@ -35,6 +35,8 @@ import java.util.Locale;
import java.util.Optional;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.utils.ListingHash;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/*******************************************************************************
@ -42,6 +44,8 @@ import com.kingsrook.qqq.backend.core.utils.ListingHash;
*******************************************************************************/
public interface QRecordEnum
{
Logger LOG = LogManager.getLogger(QRecordEnum.class);
ListingHash<Class<? extends QRecordEnum>, QRecordEntityField> fieldMapping = new ListingHash<>();
@ -140,9 +144,9 @@ public interface QRecordEnum
}
else
{
if(!method.getName().equals("getClass"))
if(!method.getName().equals("getClass") && !method.getName().equals("getDeclaringClass") && !method.getName().equals("getPossibleValueId"))
{
System.err.println("Method [" + method.getName() + "] looks like a getter, but its return type, [" + method.getReturnType() + "], isn't supported.");
LOG.debug("Method [" + method.getName() + "] looks like a getter, but its return type, [" + method.getReturnType() + "], isn't supported.");
}
}
}

View File

@ -60,6 +60,16 @@ public class QBackendMetaData
/*******************************************************************************
**
*******************************************************************************/
public boolean requiresPrimaryKeyOnTables()
{
return (true);
}
/*******************************************************************************
**
*******************************************************************************/

View File

@ -22,6 +22,7 @@
package com.kingsrook.qqq.backend.core.modules.backend.implementations.enumeration;
import com.kingsrook.qqq.backend.core.actions.interfaces.CountInterface;
import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableBackendDetails;
import com.kingsrook.qqq.backend.core.modules.backend.QBackendModuleInterface;
@ -68,4 +69,15 @@ public class EnumerationBackendModule implements QBackendModuleInterface
return new EnumerationQueryAction();
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public CountInterface getCountInterface()
{
return new EnumerationCountAction();
}
}

View File

@ -0,0 +1,57 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.modules.backend.implementations.enumeration;
import com.kingsrook.qqq.backend.core.actions.interfaces.CountInterface;
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.count.CountOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
/*******************************************************************************
**
*******************************************************************************/
public class EnumerationCountAction implements CountInterface
{
/*******************************************************************************
**
*******************************************************************************/
@Override
public CountOutput execute(CountInput countInput) throws QException
{
QueryInput queryInput = new QueryInput(countInput.getInstance());
queryInput.setSession(countInput.getSession());
queryInput.setTableName(countInput.getTableName());
queryInput.setFilter(countInput.getFilter());
QueryOutput queryOutput = new QueryAction().execute(queryInput);
CountOutput countOutput = new CountOutput();
countOutput.setCount(queryOutput.getRecords().size());
return (countOutput);
}
}

View File

@ -273,6 +273,14 @@ public class BackendQueryFilterUtils
*******************************************************************************/
private static boolean testIn(QFilterCriteria criterion, Serializable value)
{
if(CollectionUtils.nullSafeHasContents(criterion.getValues()))
{
if(criterion.getValues().get(0) instanceof String && value instanceof Number)
{
value = String.valueOf(value);
}
}
if(!criterion.getValues().contains(value))
{
return (false);

View File

@ -373,5 +373,21 @@ public class StreamedETLWithFrontendProcess
return (this);
}
/*******************************************************************************
**
*******************************************************************************/
public Builder withPreviewStepInputFields(List<QFieldMetaData> fieldList)
{
QBackendStepMetaData previewStep = processMetaData.getBackendStep(STEP_NAME_PREVIEW);
for(QFieldMetaData field : fieldList)
{
previewStep.getInputMetaData().withField(field);
}
return (this);
}
}
}

View File

@ -0,0 +1,80 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.processes.implementations.general;
import java.util.ArrayList;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
import static com.kingsrook.qqq.backend.core.model.actions.processes.Status.OK;
/*******************************************************************************
** Helper for working with process summary lines
*******************************************************************************/
public class StandardProcessSummaryLineProducer
{
/*******************************************************************************
** Make a line that'll say " {will be/was/were} inserted"
*******************************************************************************/
public static ProcessSummaryLine getOkToInsertLine()
{
return new ProcessSummaryLine(OK)
.withMessageSuffix(" inserted")
.withSingularFutureMessage("will be")
.withPluralFutureMessage("will be")
.withSingularPastMessage("was")
.withPluralPastMessage("were");
}
/*******************************************************************************
** Make a line that'll say " {will be/was/were} updated"
*******************************************************************************/
public static ProcessSummaryLine getOkToUpdateLine()
{
return new ProcessSummaryLine(OK)
.withMessageSuffix(" updated")
.withSingularFutureMessage("will be")
.withPluralFutureMessage("will be")
.withSingularPastMessage("was")
.withPluralPastMessage("were");
}
/*******************************************************************************
** one-liner for implementing getProcessSummary - just pass your lines in as varargs as in:
** return (StandardProcessSummaryLineProducer.toArrayList(okToInsert, okToUpdate));
*******************************************************************************/
public static ArrayList<ProcessSummaryLineInterface> toArrayList(ProcessSummaryLine... lines)
{
ArrayList<ProcessSummaryLineInterface> rs = new ArrayList<>();
for(ProcessSummaryLine line : lines)
{
line.addSelfToListIfAnyCount(rs);
}
return (rs);
}
}

View File

@ -0,0 +1,230 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.processes.implementations.tablesync;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
import com.kingsrook.qqq.backend.core.actions.values.QPossibleValueTranslator;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.Status;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractTransformStep;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.StreamedETLWithFrontendProcess;
import com.kingsrook.qqq.backend.core.processes.implementations.general.StandardProcessSummaryLineProducer;
import com.kingsrook.qqq.backend.core.processes.utils.GeneralProcessUtils;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
/*******************************************************************************
** This class is for transforming records from a Source table to a Destination table.
**
** The Source table has a (unique/primary) key field: sourceTableKeyField,
** Which is matched against the Destination table's foreign-key: destinationTableForeignKeyField
*******************************************************************************/
public abstract class AbstractTableSyncTransformStep extends AbstractTransformStep
{
private ProcessSummaryLine okToInsert = StandardProcessSummaryLineProducer.getOkToInsertLine();
private ProcessSummaryLine okToUpdate = StandardProcessSummaryLineProducer.getOkToUpdateLine();
private ProcessSummaryLine errorMissingKeyField = new ProcessSummaryLine(Status.ERROR)
.withMessageSuffix("missing a value for the key field.")
.withSingularFutureMessage("will not be synced, because it is ")
.withPluralFutureMessage("will not be synced, because they are ")
.withSingularPastMessage("was not synced, because it is ")
.withPluralPastMessage("were not synced, because they are ");
private RunBackendStepInput runBackendStepInput = null;
private QPossibleValueTranslator possibleValueTranslator;
private Map<String, Map<Serializable, QRecord>> tableMaps = new HashMap<>();
/*******************************************************************************
**
*******************************************************************************/
protected QRecord getRecord(String tableName, String fieldName, Serializable value) throws QException
{
if(!tableMaps.containsKey(tableName))
{
Map<Serializable, QRecord> recordMap = GeneralProcessUtils.loadTableToMap(runBackendStepInput, tableName, fieldName);
tableMaps.put(tableName, recordMap);
}
return (tableMaps.get(tableName).get(value));
}
/*******************************************************************************
**
*******************************************************************************/
protected Serializable getRecordField(String tableName, String fieldName, Serializable value, String outputField) throws QException
{
QRecord record = getRecord(tableName, fieldName, value);
if(record == null)
{
return (null);
}
return (record.getValue(outputField));
}
/*******************************************************************************
**
*******************************************************************************/
@Override
public ArrayList<ProcessSummaryLineInterface> getProcessSummary(RunBackendStepOutput runBackendStepOutput, boolean isForResultScreen)
{
return (StandardProcessSummaryLineProducer.toArrayList(okToInsert, okToUpdate, errorMissingKeyField));
}
/*******************************************************************************
**
*******************************************************************************/
public abstract QRecord populateRecordToStore(RunBackendStepInput runBackendStepInput, QRecord destinationRecord, QRecord sourceRecord) throws QException;
/*******************************************************************************
**
*******************************************************************************/
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
if(CollectionUtils.nullSafeIsEmpty(runBackendStepInput.getRecords()))
{
return;
}
this.runBackendStepInput = runBackendStepInput;
String sourceTableKeyField = runBackendStepInput.getValueString(TableSyncProcess.FIELD_SOURCE_TABLE_KEY_FIELD);
String destinationTableForeignKeyField = runBackendStepInput.getValueString(TableSyncProcess.FIELD_DESTINATION_TABLE_FOREIGN_KEY);
String destinationTableName = runBackendStepInput.getValueString(StreamedETLWithFrontendProcess.FIELD_DESTINATION_TABLE);
//////////////////////////////////////
// extract keys from source records //
//////////////////////////////////////
List<Serializable> sourceKeyList = runBackendStepInput.getRecords().stream()
.map(r -> r.getValueString(sourceTableKeyField))
.filter(Objects::nonNull)
.filter(v -> !"".equals(v))
.collect(Collectors.toList());
///////////////////////////////////////////////////////////////////////////////////////////////////
// query to see if we already have those records in the destination (to determine insert/update) //
///////////////////////////////////////////////////////////////////////////////////////////////////
Map<Serializable, QRecord> existingRecordsByForeignKey = Collections.emptyMap();
if(!sourceKeyList.isEmpty())
{
QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance());
queryInput.setSession(runBackendStepInput.getSession());
queryInput.setTableName(destinationTableName);
queryInput.setFilter(new QQueryFilter()
.withCriteria(new QFilterCriteria(destinationTableForeignKeyField, QCriteriaOperator.IN, sourceKeyList))
);
QueryOutput queryOutput = new QueryAction().execute(queryInput);
existingRecordsByForeignKey = CollectionUtils.recordsToMap(queryOutput.getRecords(), destinationTableForeignKeyField);
}
/////////////////////////////////////////////////////////////////
// foreach source record, build the record we'll insert/update //
/////////////////////////////////////////////////////////////////
for(QRecord sourceRecord : runBackendStepInput.getRecords())
{
Serializable sourceKeyValue = sourceRecord.getValue(sourceTableKeyField);
QRecord existingRecord = existingRecordsByForeignKey.get(sourceKeyValue);
if(sourceKeyValue == null || "".equals(sourceKeyValue))
{
errorMissingKeyField.incrementCount();
try
{
errorMissingKeyField.setMessageSuffix("missing a value for the field " + runBackendStepInput.getTable().getField(sourceTableKeyField).getLabel());
}
catch(Exception e)
{
/////////////////////////////////////////
// just leave the default error suffix //
/////////////////////////////////////////
}
continue;
}
QRecord recordToStore;
if(existingRecord != null)
{
recordToStore = existingRecord;
okToUpdate.incrementCount();
}
else
{
recordToStore = new QRecord();
okToInsert.incrementCount();
}
recordToStore = populateRecordToStore(runBackendStepInput, recordToStore, sourceRecord);
runBackendStepOutput.addRecord(recordToStore);
}
////////////////////////////////////////////////
// populate possible-values for review screen //
////////////////////////////////////////////////
if(RunProcessInput.FrontendStepBehavior.BREAK.equals(runBackendStepInput.getFrontendStepBehavior()))
{
if(CollectionUtils.nullSafeHasContents(runBackendStepOutput.getRecords()))
{
if(possibleValueTranslator == null)
{
possibleValueTranslator = new QPossibleValueTranslator(runBackendStepInput.getInstance(), runBackendStepInput.getSession());
}
possibleValueTranslator.translatePossibleValuesInRecords(runBackendStepInput.getInstance().getTable(destinationTableName), runBackendStepOutput.getRecords());
}
}
}
}

View File

@ -0,0 +1,208 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.processes.implementations.tablesync;
import java.util.Collections;
import java.util.List;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType;
import com.kingsrook.qqq.backend.core.model.metadata.layout.QIcon;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QFrontendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.processes.implementations.basepull.ExtractViaBasepullQueryStep;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ExtractViaQueryStep;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.LoadViaInsertOrUpdateStep;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.StreamedETLWithFrontendProcess;
/*******************************************************************************
** Definition for Standard process to sync data from one table into another.
**
*******************************************************************************/
public class TableSyncProcess
{
public static final String FIELD_SOURCE_TABLE_KEY_FIELD = "sourceTableKeyField"; // String
public static final String FIELD_DESTINATION_TABLE_FOREIGN_KEY = "destinationTableForeignKey"; // String
/*******************************************************************************
**
*******************************************************************************/
public static Builder processMetaDataBuilder(boolean isBasePull)
{
return (Builder) new Builder(StreamedETLWithFrontendProcess.defineProcessMetaData(
isBasePull ? ExtractViaBasepullQueryStep.class : ExtractViaQueryStep.class,
null,
LoadViaInsertOrUpdateStep.class,
Collections.emptyMap()))
.withPreviewStepInputFields(List.of(
new QFieldMetaData(FIELD_SOURCE_TABLE_KEY_FIELD, QFieldType.STRING),
new QFieldMetaData(FIELD_DESTINATION_TABLE_FOREIGN_KEY, QFieldType.STRING)
))
.withPreviewMessage(StreamedETLWithFrontendProcess.DEFAULT_PREVIEW_MESSAGE_FOR_INSERT_OR_UPDATE);
}
/*******************************************************************************
**
*******************************************************************************/
public static class Builder extends StreamedETLWithFrontendProcess.Builder
{
/*******************************************************************************
** Constructor
**
*******************************************************************************/
public Builder(QProcessMetaData processMetaData)
{
super(processMetaData);
}
/*******************************************************************************
** Fluent setter for sourceTableKeyField
**
*******************************************************************************/
public Builder withSourceTableKeyField(String sourceTableKeyField)
{
setInputFieldDefaultValue(FIELD_SOURCE_TABLE_KEY_FIELD, sourceTableKeyField);
return (this);
}
/*******************************************************************************
** Fluent setter for destinationTableForeignKeyField
**
*******************************************************************************/
public Builder withDestinationTableForeignKeyField(String destinationTableForeignKeyField)
{
setInputFieldDefaultValue(FIELD_DESTINATION_TABLE_FOREIGN_KEY, destinationTableForeignKeyField);
return (this);
}
/*******************************************************************************
** Fluent setter for transformStepClass
**
*******************************************************************************/
public Builder withSyncTransformStepClass(Class<? extends AbstractTableSyncTransformStep> transformStepClass)
{
setInputFieldDefaultValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE, new QCodeReference(transformStepClass));
return (this);
}
/*******************************************************************************
** Fluent setter for sourceTable
**
*******************************************************************************/
public Builder withSourceTable(String sourceTable)
{
setInputFieldDefaultValue(StreamedETLWithFrontendProcess.FIELD_SOURCE_TABLE, sourceTable);
return (this);
}
/*******************************************************************************
** Fluent setter for destinationTable
**
*******************************************************************************/
public Builder withDestinationTable(String destinationTable)
{
setInputFieldDefaultValue(StreamedETLWithFrontendProcess.FIELD_DESTINATION_TABLE, destinationTable);
return (this);
}
/*******************************************************************************
** Fluent setter for name
**
*******************************************************************************/
public Builder withName(String name)
{
processMetaData.setName(name);
return (this);
}
/*******************************************************************************
** Fluent setter for label
**
*******************************************************************************/
public Builder withLabel(String name)
{
processMetaData.setLabel(name);
return (this);
}
/*******************************************************************************
** Fluent setter for tableName
**
*******************************************************************************/
public Builder withTableName(String tableName)
{
processMetaData.setTableName(tableName);
return (this);
}
/*******************************************************************************
** Fluent setter for icon
**
*******************************************************************************/
public Builder withIcon(QIcon icon)
{
processMetaData.setIcon(icon);
return (this);
}
/*******************************************************************************
**
*******************************************************************************/
public Builder withReviewStepRecordFields(List<QFieldMetaData> fieldList)
{
QFrontendStepMetaData reviewStep = processMetaData.getFrontendStep(StreamedETLWithFrontendProcess.STEP_NAME_REVIEW);
for(QFieldMetaData fieldMetaData : fieldList)
{
reviewStep.withRecordListField(fieldMetaData);
}
return (this);
}
}
}

View File

@ -119,7 +119,7 @@ public class ScheduleManager
for(QProcessMetaData process : qInstance.getProcesses().values())
{
if(process.getSchedule() != null)
if(process.getSchedule() != null && allowedToStart(process.getName()))
{
startProcess(process);
}
@ -140,33 +140,56 @@ public class ScheduleManager
List<PollingAutomationPerTableRunner.TableActions> tableActions = PollingAutomationPerTableRunner.getTableActions(qInstance, automationProvider.getName());
for(PollingAutomationPerTableRunner.TableActions tableAction : tableActions)
{
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProvider.getName(), sessionSupplier, tableAction);
StandardScheduledExecutor executor = new StandardScheduledExecutor(runner);
if(allowedToStart(tableAction.tableName()))
{
PollingAutomationPerTableRunner runner = new PollingAutomationPerTableRunner(qInstance, automationProvider.getName(), sessionSupplier, tableAction);
StandardScheduledExecutor executor = new StandardScheduledExecutor(runner);
QScheduleMetaData schedule = Objects.requireNonNullElseGet(automationProvider.getSchedule(), this::getDefaultSchedule);
QScheduleMetaData schedule = Objects.requireNonNullElseGet(automationProvider.getSchedule(), this::getDefaultSchedule);
executor.setName(runner.getName());
setScheduleInExecutor(schedule, executor);
executor.start();
executor.setName(runner.getName());
setScheduleInExecutor(schedule, executor);
executor.start();
executors.add(executor);
executors.add(executor);
}
}
}
/*******************************************************************************
**
*******************************************************************************/
private boolean allowedToStart(String name)
{
String propertyName = "qqq.scheduleManager.onlyStartNamesMatching";
String propertyValue = System.getProperty(propertyName, "");
if(propertyValue.equals(""))
{
return (true);
}
return (name.matches(propertyValue));
}
/*******************************************************************************
**
*******************************************************************************/
private void startQueueProvider(QQueueProviderMetaData queueProvider)
{
switch(queueProvider.getType())
if(allowedToStart(queueProvider.getName()))
{
case SQS:
startSqsProvider((SQSQueueProviderMetaData) queueProvider);
break;
default:
throw new IllegalArgumentException("Unhandled queue provider type: " + queueProvider.getType());
switch(queueProvider.getType())
{
case SQS:
startSqsProvider((SQSQueueProviderMetaData) queueProvider);
break;
default:
throw new IllegalArgumentException("Unhandled queue provider type: " + queueProvider.getType());
}
}
}
@ -182,7 +205,7 @@ public class ScheduleManager
for(QQueueMetaData queue : qInstance.getQueues().values())
{
if(queueProvider.getName().equals(queue.getProviderName()))
if(queueProvider.getName().equals(queue.getProviderName()) && allowedToStart(queue.getName()))
{
SQSQueuePoller sqsQueuePoller = new SQSQueuePoller();
sqsQueuePoller.setQueueProviderMetaData(queueProvider);