diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/AbstractTableSyncTransformStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/AbstractTableSyncTransformStep.java index 0e3d21a7..62ea1d33 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/AbstractTableSyncTransformStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/AbstractTableSyncTransformStep.java @@ -25,13 +25,16 @@ package com.kingsrook.qqq.backend.core.processes.implementations.tablesync; import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; import com.kingsrook.qqq.backend.core.actions.tables.QueryAction; import com.kingsrook.qqq.backend.core.actions.values.QPossibleValueTranslator; import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.logging.QLogger; import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine; import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; @@ -53,6 +56,7 @@ import com.kingsrook.qqq.backend.core.utils.CollectionUtils; import com.kingsrook.qqq.backend.core.utils.Pair; import com.kingsrook.qqq.backend.core.utils.StringUtils; import com.kingsrook.qqq.backend.core.utils.ValueUtils; +import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair; /******************************************************************************* @@ -63,6 +67,8 @@ import com.kingsrook.qqq.backend.core.utils.ValueUtils; *******************************************************************************/ public abstract class AbstractTableSyncTransformStep extends AbstractTransformStep { + private static final QLogger LOG = QLogger.getLogger(AbstractTableSyncTransformStep.class); + private ProcessSummaryLine okToInsert = StandardProcessSummaryLineProducer.getOkToInsertLine(); private ProcessSummaryLine okToUpdate = StandardProcessSummaryLineProducer.getOkToUpdateLine(); private ProcessSummaryLine errorMissingKeyField = new ProcessSummaryLine(Status.ERROR) @@ -217,10 +223,17 @@ public abstract class AbstractTableSyncTransformStep extends AbstractTransformSt ///////////////////////////////////////////////////////////////// // foreach source record, build the record we'll insert/update // ///////////////////////////////////////////////////////////////// - QFieldMetaData destinationForeignKeyField = runBackendStepInput.getInstance().getTable(destinationTableName).getField(destinationTableForeignKeyField); + QFieldMetaData destinationForeignKeyField = runBackendStepInput.getInstance().getTable(destinationTableName).getField(destinationTableForeignKeyField); + Set processedSourceKeys = new HashSet<>(); for(QRecord sourceRecord : runBackendStepInput.getRecords()) { Serializable sourceKeyValue = sourceRecord.getValue(sourceTableKeyField); + if(processedSourceKeys.contains(sourceKeyValue)) + { + LOG.info("Skipping duplicated source-key within page", logPair("key", sourceKeyValue)); + continue; + } + processedSourceKeys.add(sourceKeyValue); if(sourceKeyValue == null || "".equals(sourceKeyValue)) { diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/TableSyncProcessTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/TableSyncProcessTest.java index cb189e2c..dbfba6d1 100644 --- a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/TableSyncProcessTest.java +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/tablesync/TableSyncProcessTest.java @@ -32,6 +32,7 @@ import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface; import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput; import com.kingsrook.qqq.backend.core.model.data.QRecord; @@ -39,6 +40,8 @@ import com.kingsrook.qqq.backend.core.model.metadata.QInstance; import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData; import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType; import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractExtractStep; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ExtractViaQueryStep; import com.kingsrook.qqq.backend.core.processes.utils.GeneralProcessUtils; import com.kingsrook.qqq.backend.core.utils.TestUtils; import org.junit.jupiter.api.Test; @@ -51,6 +54,10 @@ import static org.junit.jupiter.api.Assertions.assertEquals; *******************************************************************************/ class TableSyncProcessTest extends BaseTest { + String PROCESS_NAME = "testSyncProcess"; + String TABLE_NAME_PEOPLE_SYNC = "peopleSync"; + + /******************************************************************************* ** @@ -59,10 +66,49 @@ class TableSyncProcessTest extends BaseTest void test() throws Exception { QInstance qInstance = QContext.getQInstance(); + setupDataAndMetaData(ExtractViaQueryStep.class); + RunProcessInput runProcessInput = new RunProcessInput(); + runProcessInput.setProcessName(PROCESS_NAME); + runProcessInput.addValue("recordIds", "1,2,3,4,5"); + runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); + + RunProcessAction runProcessAction = new RunProcessAction(); + RunProcessOutput runProcessOutput = runProcessAction.execute(runProcessInput); + + @SuppressWarnings("unchecked") + ArrayList processResults = (ArrayList) runProcessOutput.getValues().get("processResults"); + + assertThat(processResults.get(0)) + .hasFieldOrPropertyWithValue("message", "were inserted") + .hasFieldOrPropertyWithValue("count", 3); + + assertThat(processResults.get(1)) + .hasFieldOrPropertyWithValue("message", "were updated") + .hasFieldOrPropertyWithValue("count", 2); + + List syncedRecords = TestUtils.queryTable(qInstance, TABLE_NAME_PEOPLE_SYNC); + assertEquals(5, syncedRecords.size()); + + ///////////////////////////////////////////////////////////////// + // make sure the record referencing 3 has had its name updated // + // and the one referencing 5 stayed the same // + ///////////////////////////////////////////////////////////////// + Map syncPersonsBySourceId = GeneralProcessUtils.loadTableToMap(runProcessInput, TABLE_NAME_PEOPLE_SYNC, "sourcePersonId"); + assertEquals("Tyler", syncPersonsBySourceId.get(3).getValueString("firstName")); + assertEquals("Homer", syncPersonsBySourceId.get(5).getValueString("firstName")); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private void setupDataAndMetaData(Class extractStepClass) throws QException + { + QInstance qInstance = QContext.getQInstance(); QTableMetaData personTable = qInstance.getTable(TestUtils.TABLE_NAME_PERSON_MEMORY); - String TABLE_NAME_PEOPLE_SYNC = "peopleSync"; qInstance.addTable(new QTableMetaData() .withName(TABLE_NAME_PEOPLE_SYNC) .withPrimaryKeyField("id") @@ -83,16 +129,31 @@ class TableSyncProcessTest extends BaseTest new QRecord().withValue("sourcePersonId", 5).withValue("firstName", "Homer") )); - String PROCESS_NAME = "testSyncProcess"; qInstance.addProcess(TableSyncProcess.processMetaDataBuilder(false) .withName(PROCESS_NAME) .withTableName(TestUtils.TABLE_NAME_PERSON_MEMORY) .withSyncTransformStepClass(PersonTransformClass.class) + .withExtractStepClass(extractStepClass) .getProcessMetaData()); + } + + + + /******************************************************************************* + ** Handle a case where an extract step sends duplicate records (in the same page) + ** into the transform & load steps. + ** + ** Basically the same test as above - only we have a custom Extract step, that + ** produces duplicates. + *******************************************************************************/ + @Test + void testDupesFromExtractStep() throws Exception + { + QInstance qInstance = QContext.getQInstance(); + setupDataAndMetaData(TestExtractor.class); RunProcessInput runProcessInput = new RunProcessInput(); runProcessInput.setProcessName(PROCESS_NAME); - runProcessInput.addValue("recordIds", "1,2,3,4,5"); runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); RunProcessAction runProcessAction = new RunProcessAction(); @@ -150,4 +211,24 @@ class TableSyncProcessTest extends BaseTest } + + + /******************************************************************************* + ** + *******************************************************************************/ + public static class TestExtractor extends AbstractExtractStep + { + @Override + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + List qRecords = TestUtils.queryTable(QContext.getQInstance(), TestUtils.TABLE_NAME_PERSON_MEMORY); + qRecords.forEach(r -> getRecordPipe().addRecord(r)); + + //////////////////////////////////////// + // re-add records 1 and 5 to the pipe // + //////////////////////////////////////// + getRecordPipe().addRecord(qRecords.get(0)); + getRecordPipe().addRecord(qRecords.get(4)); + } + } } \ No newline at end of file