mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
Fix the case where the same source record (identified by sourceKey) is passed in the input more than once (fixes some cases of duplicates)
This commit is contained in:
@ -25,13 +25,16 @@ package com.kingsrook.qqq.backend.core.processes.implementations.tablesync;
|
|||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.values.QPossibleValueTranslator;
|
import com.kingsrook.qqq.backend.core.actions.values.QPossibleValueTranslator;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
|
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
@ -53,6 +56,7 @@ import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
|||||||
import com.kingsrook.qqq.backend.core.utils.Pair;
|
import com.kingsrook.qqq.backend.core.utils.Pair;
|
||||||
import com.kingsrook.qqq.backend.core.utils.StringUtils;
|
import com.kingsrook.qqq.backend.core.utils.StringUtils;
|
||||||
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
|
import com.kingsrook.qqq.backend.core.utils.ValueUtils;
|
||||||
|
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -63,6 +67,8 @@ import com.kingsrook.qqq.backend.core.utils.ValueUtils;
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public abstract class AbstractTableSyncTransformStep extends AbstractTransformStep
|
public abstract class AbstractTableSyncTransformStep extends AbstractTransformStep
|
||||||
{
|
{
|
||||||
|
private static final QLogger LOG = QLogger.getLogger(AbstractTableSyncTransformStep.class);
|
||||||
|
|
||||||
private ProcessSummaryLine okToInsert = StandardProcessSummaryLineProducer.getOkToInsertLine();
|
private ProcessSummaryLine okToInsert = StandardProcessSummaryLineProducer.getOkToInsertLine();
|
||||||
private ProcessSummaryLine okToUpdate = StandardProcessSummaryLineProducer.getOkToUpdateLine();
|
private ProcessSummaryLine okToUpdate = StandardProcessSummaryLineProducer.getOkToUpdateLine();
|
||||||
private ProcessSummaryLine errorMissingKeyField = new ProcessSummaryLine(Status.ERROR)
|
private ProcessSummaryLine errorMissingKeyField = new ProcessSummaryLine(Status.ERROR)
|
||||||
@ -217,10 +223,17 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt
|
|||||||
/////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////
|
||||||
// foreach source record, build the record we'll insert/update //
|
// foreach source record, build the record we'll insert/update //
|
||||||
/////////////////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////////////////
|
||||||
QFieldMetaData destinationForeignKeyField = runBackendStepInput.getInstance().getTable(destinationTableName).getField(destinationTableForeignKeyField);
|
QFieldMetaData destinationForeignKeyField = runBackendStepInput.getInstance().getTable(destinationTableName).getField(destinationTableForeignKeyField);
|
||||||
|
Set<Serializable> processedSourceKeys = new HashSet<>();
|
||||||
for(QRecord sourceRecord : runBackendStepInput.getRecords())
|
for(QRecord sourceRecord : runBackendStepInput.getRecords())
|
||||||
{
|
{
|
||||||
Serializable sourceKeyValue = sourceRecord.getValue(sourceTableKeyField);
|
Serializable sourceKeyValue = sourceRecord.getValue(sourceTableKeyField);
|
||||||
|
if(processedSourceKeys.contains(sourceKeyValue))
|
||||||
|
{
|
||||||
|
LOG.info("Skipping duplicated source-key within page", logPair("key", sourceKeyValue));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
processedSourceKeys.add(sourceKeyValue);
|
||||||
|
|
||||||
if(sourceKeyValue == null || "".equals(sourceKeyValue))
|
if(sourceKeyValue == null || "".equals(sourceKeyValue))
|
||||||
{
|
{
|
||||||
|
@ -32,6 +32,7 @@ import com.kingsrook.qqq.backend.core.context.QContext;
|
|||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput;
|
||||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
@ -39,6 +40,8 @@ import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
|||||||
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType;
|
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||||
|
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractExtractStep;
|
||||||
|
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ExtractViaQueryStep;
|
||||||
import com.kingsrook.qqq.backend.core.processes.utils.GeneralProcessUtils;
|
import com.kingsrook.qqq.backend.core.processes.utils.GeneralProcessUtils;
|
||||||
import com.kingsrook.qqq.backend.core.utils.TestUtils;
|
import com.kingsrook.qqq.backend.core.utils.TestUtils;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
@ -51,6 +54,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
class TableSyncProcessTest extends BaseTest
|
class TableSyncProcessTest extends BaseTest
|
||||||
{
|
{
|
||||||
|
String PROCESS_NAME = "testSyncProcess";
|
||||||
|
String TABLE_NAME_PEOPLE_SYNC = "peopleSync";
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
@ -59,10 +66,49 @@ class TableSyncProcessTest extends BaseTest
|
|||||||
void test() throws Exception
|
void test() throws Exception
|
||||||
{
|
{
|
||||||
QInstance qInstance = QContext.getQInstance();
|
QInstance qInstance = QContext.getQInstance();
|
||||||
|
setupDataAndMetaData(ExtractViaQueryStep.class);
|
||||||
|
|
||||||
|
RunProcessInput runProcessInput = new RunProcessInput();
|
||||||
|
runProcessInput.setProcessName(PROCESS_NAME);
|
||||||
|
runProcessInput.addValue("recordIds", "1,2,3,4,5");
|
||||||
|
runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
|
||||||
|
|
||||||
|
RunProcessAction runProcessAction = new RunProcessAction();
|
||||||
|
RunProcessOutput runProcessOutput = runProcessAction.execute(runProcessInput);
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
ArrayList<ProcessSummaryLineInterface> processResults = (ArrayList<ProcessSummaryLineInterface>) runProcessOutput.getValues().get("processResults");
|
||||||
|
|
||||||
|
assertThat(processResults.get(0))
|
||||||
|
.hasFieldOrPropertyWithValue("message", "were inserted")
|
||||||
|
.hasFieldOrPropertyWithValue("count", 3);
|
||||||
|
|
||||||
|
assertThat(processResults.get(1))
|
||||||
|
.hasFieldOrPropertyWithValue("message", "were updated")
|
||||||
|
.hasFieldOrPropertyWithValue("count", 2);
|
||||||
|
|
||||||
|
List<QRecord> syncedRecords = TestUtils.queryTable(qInstance, TABLE_NAME_PEOPLE_SYNC);
|
||||||
|
assertEquals(5, syncedRecords.size());
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////////
|
||||||
|
// make sure the record referencing 3 has had its name updated //
|
||||||
|
// and the one referencing 5 stayed the same //
|
||||||
|
/////////////////////////////////////////////////////////////////
|
||||||
|
Map<Serializable, QRecord> syncPersonsBySourceId = GeneralProcessUtils.loadTableToMap(runProcessInput, TABLE_NAME_PEOPLE_SYNC, "sourcePersonId");
|
||||||
|
assertEquals("Tyler", syncPersonsBySourceId.get(3).getValueString("firstName"));
|
||||||
|
assertEquals("Homer", syncPersonsBySourceId.get(5).getValueString("firstName"));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private void setupDataAndMetaData(Class<? extends AbstractExtractStep> extractStepClass) throws QException
|
||||||
|
{
|
||||||
|
QInstance qInstance = QContext.getQInstance();
|
||||||
QTableMetaData personTable = qInstance.getTable(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
QTableMetaData personTable = qInstance.getTable(TestUtils.TABLE_NAME_PERSON_MEMORY);
|
||||||
|
|
||||||
String TABLE_NAME_PEOPLE_SYNC = "peopleSync";
|
|
||||||
qInstance.addTable(new QTableMetaData()
|
qInstance.addTable(new QTableMetaData()
|
||||||
.withName(TABLE_NAME_PEOPLE_SYNC)
|
.withName(TABLE_NAME_PEOPLE_SYNC)
|
||||||
.withPrimaryKeyField("id")
|
.withPrimaryKeyField("id")
|
||||||
@ -83,16 +129,31 @@ class TableSyncProcessTest extends BaseTest
|
|||||||
new QRecord().withValue("sourcePersonId", 5).withValue("firstName", "Homer")
|
new QRecord().withValue("sourcePersonId", 5).withValue("firstName", "Homer")
|
||||||
));
|
));
|
||||||
|
|
||||||
String PROCESS_NAME = "testSyncProcess";
|
|
||||||
qInstance.addProcess(TableSyncProcess.processMetaDataBuilder(false)
|
qInstance.addProcess(TableSyncProcess.processMetaDataBuilder(false)
|
||||||
.withName(PROCESS_NAME)
|
.withName(PROCESS_NAME)
|
||||||
.withTableName(TestUtils.TABLE_NAME_PERSON_MEMORY)
|
.withTableName(TestUtils.TABLE_NAME_PERSON_MEMORY)
|
||||||
.withSyncTransformStepClass(PersonTransformClass.class)
|
.withSyncTransformStepClass(PersonTransformClass.class)
|
||||||
|
.withExtractStepClass(extractStepClass)
|
||||||
.getProcessMetaData());
|
.getProcessMetaData());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Handle a case where an extract step sends duplicate records (in the same page)
|
||||||
|
** into the transform & load steps.
|
||||||
|
**
|
||||||
|
** Basically the same test as above - only we have a custom Extract step, that
|
||||||
|
** produces duplicates.
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
void testDupesFromExtractStep() throws Exception
|
||||||
|
{
|
||||||
|
QInstance qInstance = QContext.getQInstance();
|
||||||
|
setupDataAndMetaData(TestExtractor.class);
|
||||||
|
|
||||||
RunProcessInput runProcessInput = new RunProcessInput();
|
RunProcessInput runProcessInput = new RunProcessInput();
|
||||||
runProcessInput.setProcessName(PROCESS_NAME);
|
runProcessInput.setProcessName(PROCESS_NAME);
|
||||||
runProcessInput.addValue("recordIds", "1,2,3,4,5");
|
|
||||||
runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
|
runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
|
||||||
|
|
||||||
RunProcessAction runProcessAction = new RunProcessAction();
|
RunProcessAction runProcessAction = new RunProcessAction();
|
||||||
@ -150,4 +211,24 @@ class TableSyncProcessTest extends BaseTest
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public static class TestExtractor extends AbstractExtractStep
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
|
{
|
||||||
|
List<QRecord> qRecords = TestUtils.queryTable(QContext.getQInstance(), TestUtils.TABLE_NAME_PERSON_MEMORY);
|
||||||
|
qRecords.forEach(r -> getRecordPipe().addRecord(r));
|
||||||
|
|
||||||
|
////////////////////////////////////////
|
||||||
|
// re-add records 1 and 5 to the pipe //
|
||||||
|
////////////////////////////////////////
|
||||||
|
getRecordPipe().addRecord(qRecords.get(0));
|
||||||
|
getRecordPipe().addRecord(qRecords.get(4));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Reference in New Issue
Block a user