From 67244d6c6eec1762bad880d9d03d0368e50a196f Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Mon, 20 Feb 2023 09:43:04 -0600 Subject: [PATCH] Initial version of abstract merge duplicates process --- .../LoadViaInsertOrUpdateStep.java | 19 +- .../AbstractMergeDuplicatesTransformStep.java | 453 ++++++++++++++++++ .../MergeDuplicatesLoadStep.java | 132 +++++ .../MergeDuplicatesProcess.java | 238 +++++++++ .../MergeDuplicatesProcessTest.java | 226 +++++++++ 5 files changed, 1064 insertions(+), 4 deletions(-) create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/AbstractMergeDuplicatesTransformStep.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesLoadStep.java create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesProcess.java create mode 100644 qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesProcessTest.java diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaInsertOrUpdateStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaInsertOrUpdateStep.java index f1ab3544..45d09c96 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaInsertOrUpdateStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/LoadViaInsertOrUpdateStep.java @@ -129,15 +129,26 @@ public class LoadViaInsertOrUpdateStep extends AbstractLoadStep QTableMetaData tableMetaData = runBackendStepInput.getInstance().getTable(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE)); recordsToInsert = new ArrayList<>(); recordsToUpdate = new ArrayList<>(); - for(QRecord record : runBackendStepInput.getRecords()) + + splitRecordsForInsertOrUpdate(runBackendStepInput.getRecords(), tableMetaData.getPrimaryKeyField(), recordsToInsert, recordsToUpdate); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected void splitRecordsForInsertOrUpdate(List inputList, String primaryKeyFieldName, List insertList, List updateList) + { + for(QRecord record : inputList) { - if(record.getValue(tableMetaData.getPrimaryKeyField()) == null) + if(record.getValue(primaryKeyFieldName) == null) { - recordsToInsert.add(record); + insertList.add(record); } else { - recordsToUpdate.add(record); + updateList.add(record); } } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/AbstractMergeDuplicatesTransformStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/AbstractMergeDuplicatesTransformStep.java new file mode 100644 index 00000000..06485b7e --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/AbstractMergeDuplicatesTransformStep.java @@ -0,0 +1,453 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2022. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.processes.implementations.mergeduplicates; + + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import com.kingsrook.qqq.backend.core.actions.tables.QueryAction; +import com.kingsrook.qqq.backend.core.context.QContext; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.logging.QLogger; +import com.kingsrook.qqq.backend.core.model.actions.audits.AuditInput; +import com.kingsrook.qqq.backend.core.model.actions.audits.AuditSingleInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine; +import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; +import com.kingsrook.qqq.backend.core.model.actions.processes.Status; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput; +import com.kingsrook.qqq.backend.core.model.data.QRecord; +import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractTransformStep; +import com.kingsrook.qqq.backend.core.processes.implementations.general.StandardProcessSummaryLineProducer; +import com.kingsrook.qqq.backend.core.utils.CollectionUtils; +import com.kingsrook.qqq.backend.core.utils.ListingHash; +import com.kingsrook.qqq.backend.core.utils.StringUtils; + + +/******************************************************************************* + ** This class is for merging duplicate records in a table. + ** + ** We must define for the table one or more fields that we'll use to mark records as unique + *******************************************************************************/ +public abstract class AbstractMergeDuplicatesTransformStep extends AbstractTransformStep +{ + private static final QLogger LOG = QLogger.getLogger(AbstractMergeDuplicatesTransformStep.class); + + private ProcessSummaryLine okToInsert = StandardProcessSummaryLineProducer.getOkToInsertLine(); + private ProcessSummaryLine okToUpdate = StandardProcessSummaryLineProducer.getOkToUpdateLine(); + private ProcessSummaryLine okToDelete = StandardProcessSummaryLineProducer.getOkToDeleteLine(); + private ProcessSummaryLine errorMissingKeyField = new ProcessSummaryLine(Status.ERROR) + .withMessageSuffix("missing a value for the key field.") + .withSingularFutureMessage("will not be synced, because it is ") + .withPluralFutureMessage("will not be synced, because they are ") + .withSingularPastMessage("was not synced, because it is ") + .withPluralPastMessage("were not synced, because they are "); + + private ProcessSummaryLine notADuplicate = new ProcessSummaryLine(Status.INFO, "did not have any duplicates."); + private ProcessSummaryLine requestedToSkip = new ProcessSummaryLine(Status.INFO) + .withSingularFutureMessage("will be skipped, because it was not clear how it should be processed.") + .withPluralFutureMessage("will be skipped, because it was not clear how they should be processed.") + .withSingularPastMessage("was skipped, because it was not clear how it should have been processed.") + .withPluralPastMessage("were skipped, because it was not clear how they should have been processed."); + + protected RunBackendStepInput runBackendStepInput = null; + + private ListingHash otherTableIdsToDelete = new ListingHash<>(); + private ListingHash otherTableFiltersToDelete = new ListingHash<>(); + private ListingHash otherTableRecordsToStore = new ListingHash<>(); + + private AuditInput auditInput = new AuditInput(); + + private Set> keysSeenInPreviousPages = new HashSet<>(); + + + /******************************************************************************* + ** Do the main work for this process - merge a list of records. + ** May also call addOtherTableIdsToDelete, addOtherTableFilterToDelete, + ** and addOtherTableRecordsToStore + *******************************************************************************/ + public abstract QRecord buildRecordToKeep(RunBackendStepInput runBackendStepInput, List duplicateRecords) throws QException, SkipTheseRecordsException; + + + /******************************************************************************* + ** Define the config for this process - e.g., what fields & tables are used. + *******************************************************************************/ + protected abstract MergeProcessConfig getMergeProcessConfig(); + + + + /******************************************************************************* + ** Optional point where subclasses can pre-load data in-bulk on all the duplicates. + *******************************************************************************/ + protected void preProcess(ListingHash, QRecord> duplicatesMap) throws QException + { + //////////////////////// + // noop in base class // + //////////////////////// + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public ArrayList getProcessSummary(RunBackendStepOutput runBackendStepOutput, boolean isForResultScreen) + { + ArrayList processSummaryLineInterfaces = StandardProcessSummaryLineProducer.toArrayList(okToInsert, okToUpdate, okToDelete, notADuplicate, requestedToSkip, errorMissingKeyField); + + try + { + Serializable recordCount = runBackendStepOutput.getValue("recordCount"); + if(recordCount instanceof Integer recordCountInt) + { + int sum = processSummaryLineInterfaces.stream().filter(ProcessSummaryLine.class::isInstance).mapToInt(psli -> ((ProcessSummaryLine) psli).getCount()).sum(); + if(sum != recordCountInt) + { + processSummaryLineInterfaces.add(new ProcessSummaryLine(Status.INFO, null, "These counts may not add up to the number of selected records, because this process looks for duplicates of the selected records outside of what was selected.")); + } + } + } + catch(Exception e) + { + // just continue + } + + return processSummaryLineInterfaces; + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected void addOtherTableIdsToDelete(String tableName, Collection ids) + { + otherTableIdsToDelete.addAll(tableName, ids); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected void addOtherTableFilterToDelete(String tableName, QQueryFilter filter) + { + otherTableFiltersToDelete.add(tableName, filter); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + protected void addOtherTableRecordsToStore(String tableName, Collection records) + { + otherTableRecordsToStore.addAll(tableName, records); + } + + + + /******************************************************************************* + ** Record to store the config for this process - e.g., what fields & tables are used. + *******************************************************************************/ + public record MergeProcessConfig(String tableName, List uniqueKeyFieldNames, boolean doAutomaticAudits) + { + /******************************************************************************* + ** artificial method, here to make jacoco see that this class is indeed + ** included in test coverage... + ** todo call me + *******************************************************************************/ + void noop() + { + System.out.println("noop"); + } + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + if(CollectionUtils.nullSafeIsEmpty(runBackendStepInput.getRecords())) + { + return; + } + + this.runBackendStepInput = runBackendStepInput; + + //////////////////////////////////// + // clear these lists on each page // + //////////////////////////////////// + otherTableIdsToDelete.clear(); + otherTableFiltersToDelete.clear(); + otherTableRecordsToStore.clear(); + + MergeProcessConfig config = getMergeProcessConfig(); + + String tableName = config.tableName; + List uniqueKeyFieldNames = config.uniqueKeyFieldNames; + + if(!StringUtils.hasContent(tableName)) + { + throw (new IllegalStateException("Missing tableName in config for " + getClass().getSimpleName())); + } + + if(CollectionUtils.nullSafeIsEmpty(uniqueKeyFieldNames)) + { + throw (new IllegalStateException("Missing uniqueKeyFieldNames in config for " + getClass().getSimpleName())); + } + + QTableMetaData table = QContext.getQInstance().getTable(tableName); + if(table == null) + { + throw (new IllegalStateException("Unrecognized table name: " + tableName)); + } + + String primaryKeyField = table.getPrimaryKeyField(); + List uniqueKeyFields = new ArrayList<>(); + + for(String fieldName : uniqueKeyFieldNames) + { + QFieldMetaData field = table.getField(fieldName); + uniqueKeyFields.add(field); + } + + String ukLabels = StringUtils.joinWithCommasAndAnd(uniqueKeyFields.stream().map(QFieldMetaData::getLabel).toList()); + + //////////////////////////////// + // build query for duplicates // + //////////////////////////////// + QQueryFilter filter = new QQueryFilter(); + filter.setBooleanOperator(QQueryFilter.BooleanOperator.OR); + for(QRecord record : runBackendStepInput.getRecords()) + { + List ukValues = new ArrayList<>(); + QQueryFilter subFilter = new QQueryFilter(); + for(QFieldMetaData field : uniqueKeyFields) + { + ukValues.add(record.getValue(field.getName())); + subFilter.addCriteria(new QFilterCriteria(field.getName(), QCriteriaOperator.EQUALS, record.getValue(field.getName()))); + } + + if(keysSeenInPreviousPages.contains(ukValues)) + { + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // skip this key if it was in a previous page (on a different record, but the same key, so *this* record would have been found & dealt with there) // + ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + LOG.trace("Not re-processing a key from a previous page: " + ukValues); + continue; + } + + filter.addSubFilter(subFilter); + keysSeenInPreviousPages.add(ukValues); + } + + if(CollectionUtils.nullSafeIsEmpty(filter.getSubFilters())) + { + LOG.trace("No sub-filters were added - all of these records were duplicates that were processed in previous pages."); + return; + } + + LOG.trace("For an input list of [" + runBackendStepInput.getRecords().size() + "] records, we have a query with [" + filter.getSubFilters().size() + "] sub-filters."); + + QueryInput queryInput = new QueryInput(); + queryInput.setTableName(tableName); + queryInput.setFilter(filter); + QueryOutput queryOutput = new QueryAction().execute(queryInput); + + ListingHash, QRecord> duplicatesMap = new ListingHash<>(); + for(QRecord record : queryOutput.getRecords()) + { + List ukValues = new ArrayList<>(); + for(QFieldMetaData field : uniqueKeyFields) + { + ukValues.add(record.getValue(field.getName())); + } + duplicatesMap.add(ukValues, record); + } + LOG.trace("Query for duplicates found [" + queryOutput.getRecords().size() + "] records with [" + duplicatesMap.keySet().size() + "] unique keys."); + + //////////////////////////////////////////////////////////////////////////////////////////////// + // let the subclass optionally do any needed pre-processing on the batch (e.g., bulk lookups) // + //////////////////////////////////////////////////////////////////////////////////////////////// + preProcess(duplicatesMap); + + ///////////////////////////////////////////////////////////////////////////////////////// + // process the entries - keys are the duplicated key, values are the duplicate records // + ///////////////////////////////////////////////////////////////////////////////////////// + for(Map.Entry, List> entry : duplicatesMap.entrySet()) + { + List duplicateRecords = entry.getValue(); + if(duplicateRecords.size() == 1) + { + //////////////////////////////////////////////////// + // if there aren't any duplicates here, note that // + //////////////////////////////////////////////////// + notADuplicate.incrementCountAndAddPrimaryKey(duplicateRecords.get(0).getValue(primaryKeyField)); + continue; + } + + try + { + QRecord recordToKeep = buildRecordToKeep(runBackendStepInput, duplicateRecords); + if(recordToKeep == null) + { + duplicateRecords.forEach(requestedToSkip::incrementCountAndAddPrimaryKey); + continue; + } + + runBackendStepOutput.addRecord(recordToKeep); + Serializable primaryKeyToKeep = recordToKeep.getValue(primaryKeyField); + if(primaryKeyToKeep == null) + { + okToInsert.incrementCount(); + + if(config.doAutomaticAudits) + { + // todo - how to get the id of the inserted record in here... + // todo - audit details w/ the ids of the others + AuditSingleInput auditSingleInput = new AuditSingleInput().forRecord(table, recordToKeep) + .withMessage("Merged " + duplicateRecords.size() + " records with the same " + ukLabels + " into a new record."); + auditInput.addAuditSingleInput(auditSingleInput); + } + } + else + { + LOG.trace("Decided to keep pkey [" + primaryKeyToKeep + "] for key [" + entry.getKey() + "]"); + okToUpdate.incrementCountAndAddPrimaryKey(primaryKeyToKeep); + + if(config.doAutomaticAudits) + { + // todo - audit details w/ the ids of the others + AuditSingleInput auditSingleInput = new AuditSingleInput().forRecord(table, recordToKeep) + .withMessage("Merged " + (duplicateRecords.size() - 1) + " other record" + StringUtils.plural(duplicateRecords.size() - 1) + " with the same " + ukLabels + " into this record."); + auditInput.addAuditSingleInput(auditSingleInput); + } + } + + for(QRecord duplicate : duplicateRecords) + { + Serializable duplicatePrimaryKey = duplicate.getValue(primaryKeyField); + if(!Objects.equals(primaryKeyToKeep, duplicatePrimaryKey)) + { + otherTableIdsToDelete.add(tableName, duplicatePrimaryKey); + okToDelete.incrementCountAndAddPrimaryKey(duplicatePrimaryKey); + LOG.trace("Decided to delete pkey [" + duplicate + "] for key [" + entry.getKey() + "]"); + + if(config.doAutomaticAudits) + { + auditInput.addAuditSingleInput(new AuditSingleInput().forRecord(table, duplicate) + .withMessage("Deleted this record while merging it with " + (primaryKeyToKeep == null ? "a new record" : primaryKeyToKeep))); + } + } + } + } + catch(SkipTheseRecordsException e) + { + duplicateRecords.forEach(requestedToSkip::incrementCountAndAddPrimaryKey); + } + } + + runBackendStepOutput.addValue("otherTableIdsToDelete", otherTableIdsToDelete); + runBackendStepOutput.addValue("otherTableFiltersToDelete", otherTableFiltersToDelete); + runBackendStepOutput.addValue("otherTableRecordsToStore", otherTableRecordsToStore); + + if(config.doAutomaticAudits) + { + runBackendStepOutput.addValue("auditInput", auditInput); + } + + //////////////////////////////////////////////// + // populate possible-values for review screen // + //////////////////////////////////////////////// + /* todo + if(RunProcessInput.FrontendStepBehavior.BREAK.equals(runBackendStepInput.getFrontendStepBehavior())) + { + if(CollectionUtils.nullSafeHasContents(runBackendStepOutput.getRecords())) + { + if(possibleValueTranslator == null) + { + possibleValueTranslator = new QPossibleValueTranslator(runBackendStepInput.getInstance(), runBackendStepInput.getSession()); + } + + possibleValueTranslator.translatePossibleValuesInRecords(runBackendStepInput.getInstance().getTable(uniqueKeyFieldNames), runBackendStepOutput.getRecords()); + } + } + */ + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public static class SkipTheseRecordsException extends QException + { + /******************************************************************************* + ** + *******************************************************************************/ + public SkipTheseRecordsException(Throwable t) + { + super(t); + } + + + + /******************************************************************************* + ** Constructor + ** + *******************************************************************************/ + public SkipTheseRecordsException(String message) + { + super(message); + } + + + + /******************************************************************************* + ** Constructor + ** + *******************************************************************************/ + public SkipTheseRecordsException(String message, Throwable t) + { + super(message, t); + } + + } +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesLoadStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesLoadStep.java new file mode 100644 index 00000000..54f26ae6 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesLoadStep.java @@ -0,0 +1,132 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2022. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.processes.implementations.mergeduplicates; + + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import com.kingsrook.qqq.backend.core.actions.audits.AuditAction; +import com.kingsrook.qqq.backend.core.actions.tables.DeleteAction; +import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; +import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction; +import com.kingsrook.qqq.backend.core.context.QContext; +import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.logging.QLogger; +import com.kingsrook.qqq.backend.core.model.actions.audits.AuditInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput; +import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter; +import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput; +import com.kingsrook.qqq.backend.core.model.data.QRecord; +import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.LoadViaInsertOrUpdateStep; +import com.kingsrook.qqq.backend.core.utils.ListingHash; + + +/******************************************************************************* + ** Generic ETL Load step for a merge duplicates process. + ** + ** Uses otherTable* fields from the process state to do additional deletes, inserts + ** and/or updates. + *******************************************************************************/ +public class MergeDuplicatesLoadStep extends LoadViaInsertOrUpdateStep +{ + private static final QLogger LOG = QLogger.getLogger(MergeDuplicatesLoadStep.class); + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException + { + super.run(runBackendStepInput, runBackendStepOutput); + + ListingHash otherTableIdsToDelete = (ListingHash) runBackendStepInput.getValue("otherTableIdsToDelete"); + ListingHash otherTableFiltersToDelete = (ListingHash) runBackendStepInput.getValue("otherTableFiltersToDelete"); + ListingHash otherTableRecordsToStore = (ListingHash) runBackendStepInput.getValue("otherTableRecordsToStore"); + + if(otherTableIdsToDelete != null) + { + for(String tableName : otherTableIdsToDelete.keySet()) + { + DeleteInput deleteInput = new DeleteInput(); + deleteInput.setTableName(tableName); + deleteInput.setPrimaryKeys(new ArrayList<>(otherTableIdsToDelete.get(tableName))); + getTransaction().ifPresent(deleteInput::setTransaction); + new DeleteAction().execute(deleteInput); + } + } + + if(otherTableFiltersToDelete != null) + { + for(String tableName : otherTableFiltersToDelete.keySet()) + { + for(QQueryFilter filter : otherTableFiltersToDelete.get(tableName)) + { + DeleteInput deleteInput = new DeleteInput(); + deleteInput.setTableName(tableName); + deleteInput.setQueryFilter(filter); + getTransaction().ifPresent(deleteInput::setTransaction); + new DeleteAction().execute(deleteInput); + } + } + } + + if(otherTableRecordsToStore != null) + { + for(String tableName : otherTableRecordsToStore.keySet()) + { + QTableMetaData table = QContext.getQInstance().getTable(tableName); + + List recordsToInsert = new ArrayList<>(); + List recordsToUpdate = new ArrayList<>(); + + splitRecordsForInsertOrUpdate(otherTableRecordsToStore.get(tableName), table.getPrimaryKeyField(), recordsToInsert, recordsToUpdate); + + InsertInput insertInput = new InsertInput(); + insertInput.setTableName(tableName); + insertInput.setRecords(recordsToInsert); + getTransaction().ifPresent(insertInput::setTransaction); + new InsertAction().execute(insertInput); + + UpdateInput updateInput = new UpdateInput(); + updateInput.setTableName(tableName); + updateInput.setRecords(recordsToUpdate); + getTransaction().ifPresent(updateInput::setTransaction); + new UpdateAction().execute(updateInput); + } + } + + AuditInput auditInput = (AuditInput) runBackendStepInput.getValue("auditInput"); + if(auditInput != null) + { + // todo exec async? + new AuditAction().execute(auditInput); + } + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesProcess.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesProcess.java new file mode 100644 index 00000000..899356f7 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesProcess.java @@ -0,0 +1,238 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2022. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.processes.implementations.mergeduplicates; + + +import java.util.Collections; +import java.util.List; +import com.kingsrook.qqq.backend.core.exceptions.QRuntimeException; +import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference; +import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType; +import com.kingsrook.qqq.backend.core.model.metadata.layout.QIcon; +import com.kingsrook.qqq.backend.core.model.metadata.processes.QFrontendStepMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.scheduleing.QScheduleMetaData; +import com.kingsrook.qqq.backend.core.processes.implementations.basepull.BasepullConfiguration; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractLoadStep; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractTransformStep; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ExtractViaQueryStep; +import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.StreamedETLWithFrontendProcess; + + +/******************************************************************************* + ** Definition for Standard process to merge duplicate records in a table. + ** + *******************************************************************************/ +public class MergeDuplicatesProcess +{ + public static final String FIELD_SOURCE_TABLE_KEY_FIELD = "sourceTableKeyField"; // String + public static final String FIELD_UNIQUE_KEYS_FIELD = "uniqueKeys"; // String + + + + /******************************************************************************* + ** + *******************************************************************************/ + public static Builder processMetaDataBuilder() + { + return (Builder) new Builder(StreamedETLWithFrontendProcess.defineProcessMetaData( + ExtractViaQueryStep.class, + null, + MergeDuplicatesLoadStep.class, + Collections.emptyMap())) + .withFields(List.of( + new QFieldMetaData(FIELD_SOURCE_TABLE_KEY_FIELD, QFieldType.STRING), + new QFieldMetaData(FIELD_UNIQUE_KEYS_FIELD, QFieldType.STRING) + )) + .withPreviewMessage(StreamedETLWithFrontendProcess.DEFAULT_PREVIEW_MESSAGE_FOR_INSERT_OR_UPDATE); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public static class Builder extends StreamedETLWithFrontendProcess.Builder + { + + /******************************************************************************* + ** Constructor + ** + *******************************************************************************/ + public Builder(QProcessMetaData processMetaData) + { + super(processMetaData); + } + + + + /******************************************************************************* + ** Fluent setter for transformStepClass + ** + *******************************************************************************/ + public Builder withTransformStepClass(Class transformStepClass) + { + throw (new IllegalArgumentException("withTransformStepClass should not be called in a TableSyncProcess. You probably meant withMergeDuplicatesTransformStepClass")); + } + + + + /******************************************************************************* + ** Fluent setter for loadStepClass + ** + *******************************************************************************/ + public Builder withLoadStepClass(Class loadStepClass) + { + super.withLoadStepClass(loadStepClass); + return (this); + } + + + + /******************************************************************************* + ** Fluent setter for transformStepClass. Note - call this method also makes + ** sourceTable be set - by getting it from the SyncProcessConfig record defined + ** in the step class. + ** + *******************************************************************************/ + public Builder withMergeDuplicatesTransformStepClass(Class transformStepClass) + { + setInputFieldDefaultValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE, new QCodeReference(transformStepClass)); + AbstractMergeDuplicatesTransformStep.MergeProcessConfig config; + + try + { + AbstractMergeDuplicatesTransformStep transformStep = transformStepClass.getConstructor().newInstance(); + config = transformStep.getMergeProcessConfig(); + } + catch(Exception e) + { + throw (new QRuntimeException("Error setting up process with transform step class: " + transformStepClass.getName(), e)); + } + + setInputFieldDefaultValue(StreamedETLWithFrontendProcess.FIELD_SOURCE_TABLE, config.tableName()); + setInputFieldDefaultValue(StreamedETLWithFrontendProcess.FIELD_DESTINATION_TABLE, config.tableName()); + return (this); + } + + + + /******************************************************************************* + ** Fluent setter for name + ** + *******************************************************************************/ + public Builder withName(String name) + { + processMetaData.setName(name); + return (this); + } + + + + /******************************************************************************* + ** Fluent setter for label + ** + *******************************************************************************/ + public Builder withLabel(String name) + { + processMetaData.setLabel(name); + return (this); + } + + + + /******************************************************************************* + ** Fluent setter for tableName + ** + *******************************************************************************/ + public Builder withTableName(String tableName) + { + processMetaData.setTableName(tableName); + return (this); + } + + + + /******************************************************************************* + ** Fluent setter for icon + ** + *******************************************************************************/ + public Builder withIcon(QIcon icon) + { + processMetaData.setIcon(icon); + return (this); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public Builder withReviewStepRecordFields(List fieldList) + { + QFrontendStepMetaData reviewStep = processMetaData.getFrontendStep(StreamedETLWithFrontendProcess.STEP_NAME_REVIEW); + for(QFieldMetaData fieldMetaData : fieldList) + { + reviewStep.withRecordListField(fieldMetaData); + } + + return (this); + } + + + + /******************************************************************************* + ** Attach more input fields to the process (to its first step) + *******************************************************************************/ + public Builder withFields(List fieldList) + { + super.withFields(fieldList); + return (this); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public Builder withBasepullConfiguration(BasepullConfiguration basepullConfiguration) + { + processMetaData.setBasepullConfiguration(basepullConfiguration); + return (this); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public Builder withSchedule(QScheduleMetaData schedule) + { + processMetaData.setSchedule(schedule); + return (this); + } + + } +} diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesProcessTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesProcessTest.java new file mode 100644 index 00000000..ffc49016 --- /dev/null +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/mergeduplicates/MergeDuplicatesProcessTest.java @@ -0,0 +1,226 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2023. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.processes.implementations.mergeduplicates; + + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import com.kingsrook.qqq.backend.core.BaseTest; +import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction; +import com.kingsrook.qqq.backend.core.context.QContext; +import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; +import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput; +import com.kingsrook.qqq.backend.core.model.data.QRecord; +import com.kingsrook.qqq.backend.core.model.metadata.QInstance; +import com.kingsrook.qqq.backend.core.processes.utils.GeneralProcessUtils; +import com.kingsrook.qqq.backend.core.utils.TestUtils; +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + + +/******************************************************************************* + ** Unit test for MergeDuplicatesProcess + *******************************************************************************/ +class MergeDuplicatesProcessTest extends BaseTest +{ + String PROCESS_NAME = "testMergeProcess"; + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Test + void test() throws Exception + { + QInstance qInstance = QContext.getQInstance(); + addProcessToInstance(); + + TestUtils.insertRecords(qInstance, qInstance.getTable(TestUtils.TABLE_NAME_PERSON_MEMORY), List.of( + new QRecord().withValue("id", 1).withValue("firstName", "Darin").withValue("noOfShoes", 1).withValue("favoriteShapeId", 11), + new QRecord().withValue("id", 2).withValue("firstName", "Tim").withValue("noOfShoes", 2).withValue("favoriteShapeId", 12), + new QRecord().withValue("id", 3).withValue("firstName", "Tyler").withValue("noOfShoes", 1).withValue("favoriteShapeId", 13), + new QRecord().withValue("id", 4).withValue("firstName", "Darin").withValue("noOfShoes", 1).withValue("favoriteShapeId", 14), + new QRecord().withValue("id", 5).withValue("firstName", "Darin").withValue("noOfShoes", 1).withValue("favoriteShapeId", 15), + new QRecord().withValue("id", 6).withValue("firstName", "James").withValue("noOfShoes", 1).withValue("favoriteShapeId", 16), + new QRecord().withValue("id", 7).withValue("firstName", "James").withValue("noOfShoes", 1).withValue("favoriteShapeId", 17) + )); + + TestUtils.insertRecords(qInstance, qInstance.getTable(TestUtils.TABLE_NAME_SHAPE), List.of( + new QRecord().withValue("id", 11).withValue("favoredByNoOfPeople", 1), + new QRecord().withValue("id", 12).withValue("favoredByNoOfPeople", 1), + new QRecord().withValue("id", 13).withValue("favoredByNoOfPeople", 1), + new QRecord().withValue("id", 14).withValue("favoredByNoOfPeople", 1), + new QRecord().withValue("id", 15).withValue("favoredByNoOfPeople", 1), + new QRecord().withValue("id", 16).withValue("favoredByNoOfPeople", 1), + new QRecord().withValue("id", 17).withValue("favoredByNoOfPeople", 1) + )); + + RunProcessInput runProcessInput = new RunProcessInput(); + runProcessInput.setProcessName(PROCESS_NAME); + runProcessInput.addValue("recordIds", "1,2,3,4,5,6,7"); + runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); + + RunProcessAction runProcessAction = new RunProcessAction(); + RunProcessOutput runProcessOutput = runProcessAction.execute(runProcessInput); + + @SuppressWarnings("unchecked") + ArrayList processResults = (ArrayList) runProcessOutput.getValues().get("processResults"); + + assertThat(processResults.get(0)) + .hasFieldOrPropertyWithValue("message", "was updated") + .hasFieldOrPropertyWithValue("count", 1); + + assertThat(processResults.get(1)) + .hasFieldOrPropertyWithValue("message", "were deleted") + .hasFieldOrPropertyWithValue("count", 2); + + assertThat(processResults.get(2)) + .hasFieldOrPropertyWithValue("message", "did not have any duplicates.") + .hasFieldOrPropertyWithValue("count", 2); + + assertThat(processResults.get(3)) + .hasFieldOrPropertyWithValue("message", "were skipped, because it was not clear how they should have been processed.") + .hasFieldOrPropertyWithValue("count", 2); + + ///////////////////////////////////////////////// + // make sure records 4 and 5 have been deleted // + ///////////////////////////////////////////////// + Map personMap = GeneralProcessUtils.loadTableToMap(runProcessInput, TestUtils.TABLE_NAME_PERSON_MEMORY, "id"); + assertEquals(5, personMap.size()); + assertNull(personMap.get(4)); + assertNull(personMap.get(5)); + + //////////////////////////////////////////////// + // make sure person 1's noOfShoes was updated // + //////////////////////////////////////////////// + assertEquals(3, personMap.get(1).getValueInteger("noOfShoes")); + + ///////////////////////////////////////////////////////////////////////////// + // make sure the shapes corresponding to records 4 and 5 have been deleted // + ///////////////////////////////////////////////////////////////////////////// + Map shapesMap = GeneralProcessUtils.loadTableToMap(runProcessInput, TestUtils.TABLE_NAME_SHAPE, "id"); + assertEquals(5, shapesMap.size()); + assertNull(shapesMap.get(4)); + assertNull(shapesMap.get(5)); + + /////////////////////////////////////////////// + // make sure the 'other table' got an update // + /////////////////////////////////////////////// + assertEquals(3, shapesMap.get(11).getValueInteger("favoredByNoOfPeople")); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private void addProcessToInstance() + { + QInstance qInstance = QContext.getQInstance(); + + qInstance.addProcess(MergeDuplicatesProcess.processMetaDataBuilder() + .withName(PROCESS_NAME) + .withTableName(TestUtils.TABLE_NAME_PERSON_MEMORY) + .withMergeDuplicatesTransformStepClass(PersonMergeDuplicatesStep.class) + .getProcessMetaData()); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + public static class PersonMergeDuplicatesStep extends AbstractMergeDuplicatesTransformStep + { + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + protected MergeProcessConfig getMergeProcessConfig() + { + return (new MergeProcessConfig(TestUtils.TABLE_NAME_PERSON_MEMORY, List.of("firstName"), true)); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Override + public QRecord buildRecordToKeep(RunBackendStepInput runBackendStepInput, List duplicateRecords) throws SkipTheseRecordsException + { + ///////////////////////////////////// + // keep the one with the lowest id // + // add the other one's shoes to it // + ///////////////////////////////////// + QRecord recordToKeep = duplicateRecords.get(0); + int totalNoOfShoes = 0; + for(QRecord duplicateRecord : duplicateRecords) + { + totalNoOfShoes += duplicateRecord.getValueInteger("noOfShoes"); + if(duplicateRecord.getValueInteger("id") < recordToKeep.getValueInteger("id")) + { + recordToKeep = duplicateRecord; + } + + if(duplicateRecord.getValueString("firstName").equals("James")) + { + throw (new SkipTheseRecordsException("We don't want to mess with a James record...")); + } + } + + ////////////////////////////////////////////////////////////////////////// + // for ones that we aren't keeping, set to delete their favorite shapes // + ////////////////////////////////////////////////////////////////////////// + for(QRecord duplicateRecord : duplicateRecords) + { + if(duplicateRecord != recordToKeep) + { + addOtherTableIdsToDelete(TestUtils.TABLE_NAME_SHAPE, List.of(duplicateRecord.getValueInteger("favoriteShapeId"))); + } + } + + //////////////////////////////////////////////////////////////// + // for the one that we are keeping, update its favorite shape // + //////////////////////////////////////////////////////////////// + addOtherTableRecordsToStore(TestUtils.TABLE_NAME_SHAPE, List.of(new QRecord() + .withValue("id", recordToKeep.getValue("favoriteShapeId")) + .withValue("favoredByNoOfPeople", duplicateRecords.size()) + )); + + recordToKeep.setValue("noOfShoes", totalNoOfShoes); + + return (recordToKeep); + } + + } + +} \ No newline at end of file