mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
Updating table sync api
This commit is contained in:
@ -117,6 +117,7 @@ public abstract class ChildInserterPostInsertCustomizer extends AbstractPostInse
|
|||||||
insertInput.setSession(getInsertInput().getSession());
|
insertInput.setSession(getInsertInput().getSession());
|
||||||
insertInput.setTableName(getChildTableName());
|
insertInput.setTableName(getChildTableName());
|
||||||
insertInput.setRecords(childrenToInsert);
|
insertInput.setRecords(childrenToInsert);
|
||||||
|
insertInput.setTransaction(this.insertInput.getTransaction());
|
||||||
InsertOutput insertOutput = new InsertAction().execute(insertInput);
|
InsertOutput insertOutput = new InsertAction().execute(insertInput);
|
||||||
Iterator<QRecord> insertedRecordIterator = insertOutput.getRecords().iterator();
|
Iterator<QRecord> insertedRecordIterator = insertOutput.getRecords().iterator();
|
||||||
|
|
||||||
@ -148,6 +149,7 @@ public abstract class ChildInserterPostInsertCustomizer extends AbstractPostInse
|
|||||||
updateInput.setSession(getInsertInput().getSession());
|
updateInput.setSession(getInsertInput().getSession());
|
||||||
updateInput.setTableName(getInsertInput().getTableName());
|
updateInput.setTableName(getInsertInput().getTableName());
|
||||||
updateInput.setRecords(recordsToUpdate);
|
updateInput.setRecords(recordsToUpdate);
|
||||||
|
updateInput.setTransaction(this.insertInput.getTransaction());
|
||||||
new UpdateAction().execute(updateInput);
|
new UpdateAction().execute(updateInput);
|
||||||
|
|
||||||
return (rs);
|
return (rs);
|
||||||
|
@ -379,9 +379,9 @@ public class StreamedETLWithFrontendProcess
|
|||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
** Attach more input fields to the process (to its first step)
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public Builder withPreviewStepInputFields(List<QFieldMetaData> fieldList)
|
public Builder withFields(List<QFieldMetaData> fieldList)
|
||||||
{
|
{
|
||||||
QBackendStepMetaData previewStep = processMetaData.getBackendStep(STEP_NAME_PREVIEW);
|
QBackendStepMetaData previewStep = processMetaData.getBackendStep(STEP_NAME_PREVIEW);
|
||||||
for(QFieldMetaData field : fieldList)
|
for(QFieldMetaData field : fieldList)
|
||||||
@ -415,5 +415,6 @@ public class StreamedETLWithFrontendProcess
|
|||||||
processMetaData.setSchedule(schedule);
|
processMetaData.setSchedule(schedule);
|
||||||
return (this);
|
return (this);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -163,11 +163,6 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt
|
|||||||
|
|
||||||
this.runBackendStepInput = runBackendStepInput;
|
this.runBackendStepInput = runBackendStepInput;
|
||||||
|
|
||||||
if(this.recordLookupHelper == null)
|
|
||||||
{
|
|
||||||
initializeRecordLookupHelper(runBackendStepInput);
|
|
||||||
}
|
|
||||||
|
|
||||||
SyncProcessConfig config = getSyncProcessConfig();
|
SyncProcessConfig config = getSyncProcessConfig();
|
||||||
|
|
||||||
String sourceTableKeyField = config.sourceTableKeyField;
|
String sourceTableKeyField = config.sourceTableKeyField;
|
||||||
@ -199,6 +194,11 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt
|
|||||||
.filter(v -> !"".equals(v))
|
.filter(v -> !"".equals(v))
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
if(this.recordLookupHelper == null)
|
||||||
|
{
|
||||||
|
initializeRecordLookupHelper(runBackendStepInput, runBackendStepInput.getRecords());
|
||||||
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// query to see if we already have those records in the destination (to determine insert/update) //
|
// query to see if we already have those records in the destination (to determine insert/update) //
|
||||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
@ -208,6 +208,7 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt
|
|||||||
QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance());
|
QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance());
|
||||||
queryInput.setSession(runBackendStepInput.getSession());
|
queryInput.setSession(runBackendStepInput.getSession());
|
||||||
queryInput.setTableName(destinationTableName);
|
queryInput.setTableName(destinationTableName);
|
||||||
|
getTransaction().ifPresent(queryInput::setTransaction);
|
||||||
QQueryFilter filter = getExistingRecordQueryFilter(runBackendStepInput, sourceKeyList);
|
QQueryFilter filter = getExistingRecordQueryFilter(runBackendStepInput, sourceKeyList);
|
||||||
queryInput.setFilter(filter);
|
queryInput.setFilter(filter);
|
||||||
QueryOutput queryOutput = new QueryAction().execute(queryInput);
|
QueryOutput queryOutput = new QueryAction().execute(queryInput);
|
||||||
@ -286,7 +287,7 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt
|
|||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** If needed, init a record lookup helper for this process.
|
** If needed, init a record lookup helper for this process.
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
protected void initializeRecordLookupHelper(RunBackendStepInput runBackendStepInput) throws QException
|
protected void initializeRecordLookupHelper(RunBackendStepInput runBackendStepInput, List<QRecord> sourceRecordList) throws QException
|
||||||
{
|
{
|
||||||
this.recordLookupHelper = new RecordLookupHelper(runBackendStepInput);
|
this.recordLookupHelper = new RecordLookupHelper(runBackendStepInput);
|
||||||
|
|
||||||
|
@ -34,6 +34,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
|
|||||||
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.processes.implementations.basepull.BasepullConfiguration;
|
import com.kingsrook.qqq.backend.core.processes.implementations.basepull.BasepullConfiguration;
|
||||||
import com.kingsrook.qqq.backend.core.processes.implementations.basepull.ExtractViaBasepullQueryStep;
|
import com.kingsrook.qqq.backend.core.processes.implementations.basepull.ExtractViaBasepullQueryStep;
|
||||||
|
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractLoadStep;
|
||||||
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractTransformStep;
|
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractTransformStep;
|
||||||
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ExtractViaQueryStep;
|
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ExtractViaQueryStep;
|
||||||
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.LoadViaInsertOrUpdateStep;
|
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.LoadViaInsertOrUpdateStep;
|
||||||
@ -61,7 +62,7 @@ public class TableSyncProcess
|
|||||||
null,
|
null,
|
||||||
LoadViaInsertOrUpdateStep.class,
|
LoadViaInsertOrUpdateStep.class,
|
||||||
Collections.emptyMap()))
|
Collections.emptyMap()))
|
||||||
.withPreviewStepInputFields(List.of(
|
.withFields(List.of(
|
||||||
new QFieldMetaData(FIELD_SOURCE_TABLE_KEY_FIELD, QFieldType.STRING),
|
new QFieldMetaData(FIELD_SOURCE_TABLE_KEY_FIELD, QFieldType.STRING),
|
||||||
new QFieldMetaData(FIELD_DESTINATION_TABLE_FOREIGN_KEY, QFieldType.STRING)
|
new QFieldMetaData(FIELD_DESTINATION_TABLE_FOREIGN_KEY, QFieldType.STRING)
|
||||||
))
|
))
|
||||||
@ -98,6 +99,18 @@ public class TableSyncProcess
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Fluent setter for loadStepClass
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public Builder withLoadStepClass(Class<? extends AbstractLoadStep> loadStepClass)
|
||||||
|
{
|
||||||
|
super.withLoadStepClass(loadStepClass);
|
||||||
|
return (this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Fluent setter for transformStepClass. Note - call this method also makes
|
** Fluent setter for transformStepClass. Note - call this method also makes
|
||||||
** sourceTable and destinationTable be set - by getting them from the
|
** sourceTable and destinationTable be set - by getting them from the
|
||||||
@ -190,6 +203,17 @@ public class TableSyncProcess
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Attach more input fields to the process (to its first step)
|
||||||
|
*******************************************************************************/
|
||||||
|
public Builder withFields(List<QFieldMetaData> fieldList)
|
||||||
|
{
|
||||||
|
super.withFields(fieldList);
|
||||||
|
return (this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
|
@ -30,6 +30,7 @@ import java.util.Optional;
|
|||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.AbstractActionInput;
|
import com.kingsrook.qqq.backend.core.model.actions.AbstractActionInput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
|
||||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
|
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
|
||||||
|
|
||||||
@ -43,8 +44,8 @@ public class RecordLookupHelper
|
|||||||
{
|
{
|
||||||
private final AbstractActionInput actionInput;
|
private final AbstractActionInput actionInput;
|
||||||
|
|
||||||
private Map<String, Map<Serializable, QRecord>> foreignRecordMaps = new HashMap<>();
|
private Map<String, Map<Serializable, QRecord>> recordMaps = new HashMap<>();
|
||||||
private Set<String> preloadedKeys = new HashSet<>();
|
private Set<String> preloadedKeys = new HashSet<>();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -65,7 +66,7 @@ public class RecordLookupHelper
|
|||||||
public QRecord getRecordByKey(String tableName, String keyFieldName, Serializable key) throws QException
|
public QRecord getRecordByKey(String tableName, String keyFieldName, Serializable key) throws QException
|
||||||
{
|
{
|
||||||
String mapKey = tableName + "." + keyFieldName;
|
String mapKey = tableName + "." + keyFieldName;
|
||||||
Map<Serializable, QRecord> recordMap = foreignRecordMaps.computeIfAbsent(mapKey, (k) -> new HashMap<>());
|
Map<Serializable, QRecord> recordMap = recordMaps.computeIfAbsent(mapKey, (k) -> new HashMap<>());
|
||||||
|
|
||||||
if(!recordMap.containsKey(key))
|
if(!recordMap.containsKey(key))
|
||||||
{
|
{
|
||||||
@ -81,6 +82,10 @@ public class RecordLookupHelper
|
|||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Optimization - to pre-load the records in a single query, which would otherwise
|
** Optimization - to pre-load the records in a single query, which would otherwise
|
||||||
** have to be looked up one-by-one.
|
** have to be looked up one-by-one.
|
||||||
|
**
|
||||||
|
** Note that when this method is called for a given pair of params (table/field),
|
||||||
|
** a flag is set to avoid ever re-loading this pair (e.g., subsequent calls to this
|
||||||
|
** method w/ a given input pair does a noop).
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public void preloadRecords(String tableName, String keyFieldName) throws QException
|
public void preloadRecords(String tableName, String keyFieldName) throws QException
|
||||||
{
|
{
|
||||||
@ -88,13 +93,30 @@ public class RecordLookupHelper
|
|||||||
if(!preloadedKeys.contains(mapKey))
|
if(!preloadedKeys.contains(mapKey))
|
||||||
{
|
{
|
||||||
Map<Serializable, QRecord> recordMap = GeneralProcessUtils.loadTableToMap(actionInput, tableName, keyFieldName);
|
Map<Serializable, QRecord> recordMap = GeneralProcessUtils.loadTableToMap(actionInput, tableName, keyFieldName);
|
||||||
foreignRecordMaps.put(mapKey, recordMap);
|
recordMaps.put(mapKey, recordMap);
|
||||||
preloadedKeys.add(mapKey);
|
preloadedKeys.add(mapKey);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Optimization - to pre-load some records in a single query, which would otherwise
|
||||||
|
** have to be looked up one-by-one.
|
||||||
|
**
|
||||||
|
** Note that this method is different from the overload that doesn't take a filter,
|
||||||
|
** in that it doesn't set any flags to avoid re-running (the idea being, you'd pass
|
||||||
|
** a unique filter in each time, so you'd always want it to re-run).
|
||||||
|
*******************************************************************************/
|
||||||
|
public void preloadRecords(String tableName, String keyFieldName, QQueryFilter filter) throws QException
|
||||||
|
{
|
||||||
|
String mapKey = tableName + "." + keyFieldName;
|
||||||
|
Map<Serializable, QRecord> tableMap = recordMaps.computeIfAbsent(mapKey, s -> new HashMap<>());
|
||||||
|
tableMap.putAll(GeneralProcessUtils.loadTableToMap(actionInput, tableName, keyFieldName, filter));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Get a value from a record, by doing a lookup on the specified keyFieldName,
|
** Get a value from a record, by doing a lookup on the specified keyFieldName,
|
||||||
** for the specified key value.
|
** for the specified key value.
|
||||||
|
Reference in New Issue
Block a user