From 19d88910b58f29317355ea5213bf1c9360c562ba Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Tue, 20 Dec 2022 10:56:59 -0600 Subject: [PATCH] Updating table sync api --- .../ChildInserterPostInsertCustomizer.java | 2 ++ .../StreamedETLWithFrontendProcess.java | 5 ++-- .../AbstractTableSyncTransformStep.java | 13 ++++---- .../tablesync/TableSyncProcess.java | 26 +++++++++++++++- .../processes/utils/RecordLookupHelper.java | 30 ++++++++++++++++--- 5 files changed, 63 insertions(+), 13 deletions(-) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/customizers/ChildInserterPostInsertCustomizer.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/customizers/ChildInserterPostInsertCustomizer.java index 6b731012..b94ff98d 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/customizers/ChildInserterPostInsertCustomizer.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/customizers/ChildInserterPostInsertCustomizer.java @@ -117,6 +117,7 @@ public abstract class ChildInserterPostInsertCustomizer extends AbstractPostInse insertInput.setSession(getInsertInput().getSession()); insertInput.setTableName(getChildTableName()); insertInput.setRecords(childrenToInsert); + insertInput.setTransaction(this.insertInput.getTransaction()); InsertOutput insertOutput = new InsertAction().execute(insertInput); Iterator insertedRecordIterator = insertOutput.getRecords().iterator(); @@ -148,6 +149,7 @@ public abstract class ChildInserterPostInsertCustomizer extends AbstractPostInse updateInput.setSession(getInsertInput().getSession()); updateInput.setTableName(getInsertInput().getTableName()); updateInput.setRecords(recordsToUpdate); + updateInput.setTransaction(this.insertInput.getTransaction()); new UpdateAction().execute(updateInput); return (rs); diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java index 85d91c6d..e2f57883 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLWithFrontendProcess.java @@ -379,9 +379,9 @@ public class StreamedETLWithFrontendProcess /******************************************************************************* - ** + ** Attach more input fields to the process (to its first step) *******************************************************************************/ - public Builder withPreviewStepInputFields(List fieldList) + public Builder withFields(List fieldList) { QBackendStepMetaData previewStep = processMetaData.getBackendStep(STEP_NAME_PREVIEW); for(QFieldMetaData field : fieldList) @@ -415,5 +415,6 @@ public class StreamedETLWithFrontendProcess processMetaData.setSchedule(schedule); return (this); } + } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/AbstractTableSyncTransformStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/AbstractTableSyncTransformStep.java index e9543bbd..4b8d5758 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/AbstractTableSyncTransformStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/AbstractTableSyncTransformStep.java @@ -163,11 +163,6 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt this.runBackendStepInput = runBackendStepInput; - if(this.recordLookupHelper == null) - { - initializeRecordLookupHelper(runBackendStepInput); - } - SyncProcessConfig config = getSyncProcessConfig(); String sourceTableKeyField = config.sourceTableKeyField; @@ -199,6 +194,11 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt .filter(v -> !"".equals(v)) .collect(Collectors.toList()); + if(this.recordLookupHelper == null) + { + initializeRecordLookupHelper(runBackendStepInput, runBackendStepInput.getRecords()); + } + /////////////////////////////////////////////////////////////////////////////////////////////////// // query to see if we already have those records in the destination (to determine insert/update) // /////////////////////////////////////////////////////////////////////////////////////////////////// @@ -208,6 +208,7 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance()); queryInput.setSession(runBackendStepInput.getSession()); queryInput.setTableName(destinationTableName); + getTransaction().ifPresent(queryInput::setTransaction); QQueryFilter filter = getExistingRecordQueryFilter(runBackendStepInput, sourceKeyList); queryInput.setFilter(filter); QueryOutput queryOutput = new QueryAction().execute(queryInput); @@ -286,7 +287,7 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt /******************************************************************************* ** If needed, init a record lookup helper for this process. *******************************************************************************/ - protected void initializeRecordLookupHelper(RunBackendStepInput runBackendStepInput) throws QException + protected void initializeRecordLookupHelper(RunBackendStepInput runBackendStepInput, List sourceRecordList) throws QException { this.recordLookupHelper = new RecordLookupHelper(runBackendStepInput); diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/TableSyncProcess.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/TableSyncProcess.java index 60e45da7..dd3049fe 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/TableSyncProcess.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/TableSyncProcess.java @@ -34,6 +34,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData; import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData; import com.kingsrook.qqq.backend.core.processes.implementations.basepull.BasepullConfiguration; import com.kingsrook.qqq.backend.core.processes.implementations.basepull.ExtractViaBasepullQueryStep; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractLoadStep; import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractTransformStep; import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ExtractViaQueryStep; import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.LoadViaInsertOrUpdateStep; @@ -61,7 +62,7 @@ public class TableSyncProcess null, LoadViaInsertOrUpdateStep.class, Collections.emptyMap())) - .withPreviewStepInputFields(List.of( + .withFields(List.of( new QFieldMetaData(FIELD_SOURCE_TABLE_KEY_FIELD, QFieldType.STRING), new QFieldMetaData(FIELD_DESTINATION_TABLE_FOREIGN_KEY, QFieldType.STRING) )) @@ -98,6 +99,18 @@ public class TableSyncProcess + /******************************************************************************* + ** Fluent setter for loadStepClass + ** + *******************************************************************************/ + public Builder withLoadStepClass(Class loadStepClass) + { + super.withLoadStepClass(loadStepClass); + return (this); + } + + + /******************************************************************************* ** Fluent setter for transformStepClass. Note - call this method also makes ** sourceTable and destinationTable be set - by getting them from the @@ -190,6 +203,17 @@ public class TableSyncProcess + /******************************************************************************* + ** Attach more input fields to the process (to its first step) + *******************************************************************************/ + public Builder withFields(List fieldList) + { + super.withFields(fieldList); + return (this); + } + + + /******************************************************************************* ** *******************************************************************************/ diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelper.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelper.java index a09d6bef..1f22d15a 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelper.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelper.java @@ -30,6 +30,7 @@ import java.util.Optional; import java.util.Set; import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.AbstractActionInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter; import com.kingsrook.qqq.backend.core.model.data.QRecord; import com.kingsrook.qqq.backend.core.utils.ValueUtils; @@ -43,8 +44,8 @@ public class RecordLookupHelper { private final AbstractActionInput actionInput; - private Map> foreignRecordMaps = new HashMap<>(); - private Set preloadedKeys = new HashSet<>(); + private Map> recordMaps = new HashMap<>(); + private Set preloadedKeys = new HashSet<>(); @@ -65,7 +66,7 @@ public class RecordLookupHelper public QRecord getRecordByKey(String tableName, String keyFieldName, Serializable key) throws QException { String mapKey = tableName + "." + keyFieldName; - Map recordMap = foreignRecordMaps.computeIfAbsent(mapKey, (k) -> new HashMap<>()); + Map recordMap = recordMaps.computeIfAbsent(mapKey, (k) -> new HashMap<>()); if(!recordMap.containsKey(key)) { @@ -81,6 +82,10 @@ public class RecordLookupHelper /******************************************************************************* ** Optimization - to pre-load the records in a single query, which would otherwise ** have to be looked up one-by-one. + ** + ** Note that when this method is called for a given pair of params (table/field), + ** a flag is set to avoid ever re-loading this pair (e.g., subsequent calls to this + ** method w/ a given input pair does a noop). *******************************************************************************/ public void preloadRecords(String tableName, String keyFieldName) throws QException { @@ -88,13 +93,30 @@ public class RecordLookupHelper if(!preloadedKeys.contains(mapKey)) { Map recordMap = GeneralProcessUtils.loadTableToMap(actionInput, tableName, keyFieldName); - foreignRecordMaps.put(mapKey, recordMap); + recordMaps.put(mapKey, recordMap); preloadedKeys.add(mapKey); } } + /******************************************************************************* + ** Optimization - to pre-load some records in a single query, which would otherwise + ** have to be looked up one-by-one. + ** + ** Note that this method is different from the overload that doesn't take a filter, + ** in that it doesn't set any flags to avoid re-running (the idea being, you'd pass + ** a unique filter in each time, so you'd always want it to re-run). + *******************************************************************************/ + public void preloadRecords(String tableName, String keyFieldName, QQueryFilter filter) throws QException + { + String mapKey = tableName + "." + keyFieldName; + Map tableMap = recordMaps.computeIfAbsent(mapKey, s -> new HashMap<>()); + tableMap.putAll(GeneralProcessUtils.loadTableToMap(actionInput, tableName, keyFieldName, filter)); + } + + + /******************************************************************************* ** Get a value from a record, by doing a lookup on the specified keyFieldName, ** for the specified key value.