Add unique keys, and checking of them in bulk load; add some more validation (sqs and unique keys)

This commit is contained in:
2022-10-21 11:21:07 -05:00
parent 20c42deae5
commit fa2ab18e30
10 changed files with 1015 additions and 50 deletions

View File

@ -89,11 +89,11 @@ public class CsvToQRecordAdapter
*******************************************************************************/
public void buildRecordsFromCsv(InputWrapper inputWrapper)
{
String csv = inputWrapper.getCsv();
AbstractQFieldMapping<?> mapping = inputWrapper.getMapping();
Consumer<QRecord> recordCustomizer = inputWrapper.getRecordCustomizer();
QTableMetaData table = inputWrapper.getTable();
Integer limit = inputWrapper.getLimit();
String csv = inputWrapper.getCsv();
AbstractQFieldMapping<?> mapping = inputWrapper.getMapping();
Consumer<QRecord> recordCustomizer = inputWrapper.getRecordCustomizer();
QTableMetaData table = inputWrapper.getTable();
Integer limit = inputWrapper.getLimit();
if(!StringUtils.hasContent(csv))
{
@ -136,7 +136,7 @@ public class CsvToQRecordAdapter
headers = makeHeadersUnique(headers);
Iterator<CSVRecord> csvIterator = csvParser.iterator();
int recordCount = 0;
int recordCount = 0;
while(csvIterator.hasNext())
{
CSVRecord csvRecord = csvIterator.next();
@ -147,7 +147,8 @@ public class CsvToQRecordAdapter
Map<String, String> csvValues = new HashMap<>();
for(int i = 0; i < headers.size() && i < csvRecord.size(); i++)
{
csvValues.put(headers.get(i), csvRecord.get(i));
String header = adjustHeaderCase(headers.get(i), inputWrapper);
csvValues.put(header, csvRecord.get(i));
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -157,6 +158,7 @@ public class CsvToQRecordAdapter
for(QFieldMetaData field : table.getFields().values())
{
String fieldSource = mapping == null ? field.getName() : String.valueOf(mapping.getFieldSource(field.getName()));
fieldSource = adjustHeaderCase(fieldSource, inputWrapper);
qRecord.setValue(field.getName(), csvValues.get(fieldSource));
}
@ -180,7 +182,7 @@ public class CsvToQRecordAdapter
.withTrim());
Iterator<CSVRecord> csvIterator = csvParser.iterator();
int recordCount = 0;
int recordCount = 0;
while(csvIterator.hasNext())
{
CSVRecord csvRecord = csvIterator.next();
@ -228,6 +230,20 @@ public class CsvToQRecordAdapter
/*******************************************************************************
**
*******************************************************************************/
private String adjustHeaderCase(String s, InputWrapper inputWrapper)
{
if(inputWrapper.caseSensitiveHeaders)
{
return (s);
}
return (s.toLowerCase());
}
/*******************************************************************************
**
*******************************************************************************/
@ -325,6 +341,8 @@ public class CsvToQRecordAdapter
private Consumer<QRecord> recordCustomizer;
private Integer limit;
private boolean caseSensitiveHeaders = false;
/*******************************************************************************
@ -529,6 +547,40 @@ public class CsvToQRecordAdapter
return (this);
}
/*******************************************************************************
** Getter for caseSensitiveHeaders
**
*******************************************************************************/
public boolean getCaseSensitiveHeaders()
{
return caseSensitiveHeaders;
}
/*******************************************************************************
** Setter for caseSensitiveHeaders
**
*******************************************************************************/
public void setCaseSensitiveHeaders(boolean caseSensitiveHeaders)
{
this.caseSensitiveHeaders = caseSensitiveHeaders;
}
/*******************************************************************************
** Fluent setter for caseSensitiveHeaders
**
*******************************************************************************/
public InputWrapper withCaseSensitiveHeaders(boolean caseSensitiveHeaders)
{
this.caseSensitiveHeaders = caseSensitiveHeaders;
return (this);
}
}
}

View File

@ -49,9 +49,11 @@ import com.kingsrook.qqq.backend.core.model.metadata.layout.QAppSection;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QBackendStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QFieldSection;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.Tier;
import com.kingsrook.qqq.backend.core.model.metadata.tables.UniqueKey;
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTracking;
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.AutomationStatusTrackingType;
import com.kingsrook.qqq.backend.core.model.metadata.tables.automation.QTableAutomationDetails;
@ -118,6 +120,7 @@ public class QInstanceValidator
validateProcesses(qInstance);
validateApps(qInstance);
validatePossibleValueSources(qInstance);
validateQueuesAndProviders(qInstance);
}
catch(Exception e)
{
@ -134,6 +137,45 @@ public class QInstanceValidator
/*******************************************************************************
**
*******************************************************************************/
private void validateQueuesAndProviders(QInstance qInstance)
{
if(CollectionUtils.nullSafeHasContents(qInstance.getQueueProviders()))
{
qInstance.getQueueProviders().forEach((name, queueProvider) ->
{
assertCondition(Objects.equals(name, queueProvider.getName()), "Inconsistent naming for queueProvider: " + name + "/" + queueProvider.getName() + ".");
assertCondition(queueProvider.getType() != null, "Missing type for queueProvider: " + name);
if(queueProvider instanceof SQSQueueProviderMetaData sqsQueueProvider)
{
assertCondition(StringUtils.hasContent(sqsQueueProvider.getAccessKey()), "Missing accessKey for SQSQueueProvider: " + name);
assertCondition(StringUtils.hasContent(sqsQueueProvider.getSecretKey()), "Missing secretKey for SQSQueueProvider: " + name);
assertCondition(StringUtils.hasContent(sqsQueueProvider.getBaseURL()), "Missing baseURL for SQSQueueProvider: " + name);
assertCondition(StringUtils.hasContent(sqsQueueProvider.getRegion()), "Missing region for SQSQueueProvider: " + name);
}
});
}
if(CollectionUtils.nullSafeHasContents(qInstance.getQueues()))
{
qInstance.getQueues().forEach((name, queue) ->
{
assertCondition(Objects.equals(name, queue.getName()), "Inconsistent naming for queue: " + name + "/" + queue.getName() + ".");
assertCondition(qInstance.getQueueProvider(queue.getProviderName()) != null, "Unrecognized queue providerName for queue: " + name);
assertCondition(StringUtils.hasContent(queue.getQueueName()), "Missing queueName for queue: " + name);
if(assertCondition(StringUtils.hasContent(queue.getProcessName()), "Missing processName for queue: " + name))
{
assertCondition(qInstance.getProcesses() != null && qInstance.getProcess(queue.getProcessName()) != null, "Unrecognized processName for queue: " + name);
}
});
}
}
/*******************************************************************************
**
*******************************************************************************/
@ -263,12 +305,46 @@ public class QInstanceValidator
{
validateTableAutomationDetails(qInstance, table);
}
//////////////////////////////////////
// validate the table's unique keys //
//////////////////////////////////////
if(table.getUniqueKeys() != null)
{
validateTableUniqueKeys(table);
}
});
}
}
/*******************************************************************************
**
*******************************************************************************/
private void validateTableUniqueKeys(QTableMetaData table)
{
Set<Set<String>> ukSets = new HashSet<>();
for(UniqueKey uniqueKey : table.getUniqueKeys())
{
if(assertCondition(CollectionUtils.nullSafeHasContents(uniqueKey.getFieldNames()), table.getName() + " has a uniqueKey with no fields"))
{
Set<String> fieldNamesInThisUK = new HashSet<>();
for(String fieldName : uniqueKey.getFieldNames())
{
assertNoException(() -> table.getField(fieldName), table.getName() + " has a uniqueKey with an unrecognized field name: " + fieldName);
assertCondition(!fieldNamesInThisUK.contains(fieldName), table.getName() + " has a uniqueKey with the same field multiple times: " + fieldName);
fieldNamesInThisUK.add(fieldName);
}
assertCondition(!ukSets.contains(fieldNamesInThisUK), table.getName() + " has more than one uniqueKey with the same set of fields: " + fieldNamesInThisUK);
ukSets.add(fieldNamesInThisUK);
}
}
}
/*******************************************************************************
**
*******************************************************************************/

View File

@ -286,4 +286,19 @@ public class QQueryFilter implements Serializable, Cloneable
return (this);
}
/*******************************************************************************
**
*******************************************************************************/
public void addSubFilter(QQueryFilter subFilter)
{
if(this.subFilters == null)
{
subFilters = new ArrayList<>();
}
subFilters.add(subFilter);
}
}

View File

@ -63,6 +63,7 @@ public class QTableMetaData implements QAppChildMetaData, Serializable
private boolean isHidden = false;
private Map<String, QFieldMetaData> fields;
private List<UniqueKey> uniqueKeys;
private QTableBackendDetails backendDetails;
private QTableAutomationDetails automationDetails;
@ -751,4 +752,54 @@ public class QTableMetaData implements QAppChildMetaData, Serializable
return (this);
}
/*******************************************************************************
** Getter for uniqueKeys
**
*******************************************************************************/
public List<UniqueKey> getUniqueKeys()
{
return uniqueKeys;
}
/*******************************************************************************
** Setter for uniqueKeys
**
*******************************************************************************/
public void setUniqueKeys(List<UniqueKey> uniqueKeys)
{
this.uniqueKeys = uniqueKeys;
}
/*******************************************************************************
** Fluent setter for uniqueKeys
**
*******************************************************************************/
public QTableMetaData withUniqueKeys(List<UniqueKey> uniqueKeys)
{
this.uniqueKeys = uniqueKeys;
return (this);
}
/*******************************************************************************
** Fluent setter for uniqueKeys
**
*******************************************************************************/
public QTableMetaData withUniqueKey(UniqueKey uniqueKey)
{
if(this.uniqueKeys == null)
{
this.uniqueKeys = new ArrayList<>();
}
this.uniqueKeys.add(uniqueKey);
return (this);
}
}

View File

@ -0,0 +1,120 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.model.metadata.tables;
import java.util.ArrayList;
import java.util.List;
/*******************************************************************************
** Definition of a Unique Key (or "Constraint", if you wanna use fancy words)
** on a QTable.
*******************************************************************************/
public class UniqueKey
{
private List<String> fieldNames;
private String label;
/*******************************************************************************
** Getter for fieldNames
**
*******************************************************************************/
public List<String> getFieldNames()
{
return fieldNames;
}
/*******************************************************************************
** Setter for fieldNames
**
*******************************************************************************/
public void setFieldNames(List<String> fieldNames)
{
this.fieldNames = fieldNames;
}
/*******************************************************************************
** Fluent setter for fieldNames
**
*******************************************************************************/
public UniqueKey withFieldNames(List<String> fieldNames)
{
this.fieldNames = fieldNames;
return (this);
}
/*******************************************************************************
** Getter for label
**
*******************************************************************************/
public String getLabel()
{
return label;
}
/*******************************************************************************
** Setter for label
**
*******************************************************************************/
public void setLabel(String label)
{
this.label = label;
}
/*******************************************************************************
** Fluent setter for label
**
*******************************************************************************/
public UniqueKey withLabel(String label)
{
this.label = label;
return (this);
}
/*******************************************************************************
**
*******************************************************************************/
public UniqueKey withFieldName(String fieldName)
{
if(this.fieldNames == null)
{
this.fieldNames = new ArrayList<>();
}
this.fieldNames.add(fieldName);
return (this);
}
}

View File

@ -22,16 +22,33 @@
package com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
import com.kingsrook.qqq.backend.core.model.actions.processes.Status;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.UniqueKey;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.AbstractTransformStep;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.StreamedETLWithFrontendProcess;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import com.kingsrook.qqq.backend.core.utils.StringUtils;
/*******************************************************************************
@ -39,9 +56,12 @@ import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwith
*******************************************************************************/
public class BulkInsertTransformStep extends AbstractTransformStep
{
private ProcessSummaryLine okSummary = new ProcessSummaryLine(Status.OK);
private ProcessSummaryLine okSummary = new ProcessSummaryLine(Status.OK);
private Map<UniqueKey, ProcessSummaryLine> ukErrorSummaries = new HashMap<>();
private String tableLabel;
private QTableMetaData table;
private Map<UniqueKey, Set<List<Serializable>>> keysInThisFile = new HashMap<>();
@ -51,14 +71,7 @@ public class BulkInsertTransformStep extends AbstractTransformStep
@Override
public void preRun(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
///////////////////////////////////////////////////////
// capture the table label - for the process summary //
///////////////////////////////////////////////////////
QTableMetaData table = runBackendStepInput.getInstance().getTable(runBackendStepInput.getTableName());
if(table != null)
{
tableLabel = table.getLabel();
}
this.table = runBackendStepInput.getInstance().getTable(runBackendStepInput.getTableName());
}
@ -69,6 +82,16 @@ public class BulkInsertTransformStep extends AbstractTransformStep
@Override
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
{
QTableMetaData table = runBackendStepInput.getInstance().getTable(runBackendStepInput.getTableName());
Map<UniqueKey, Set<List<Serializable>>> existingKeys = new HashMap<>();
List<UniqueKey> uniqueKeys = CollectionUtils.nonNullList(table.getUniqueKeys());
for(UniqueKey uniqueKey : uniqueKeys)
{
existingKeys.put(uniqueKey, getExistingKeys(runBackendStepInput, uniqueKey));
ukErrorSummaries.computeIfAbsent(uniqueKey, x -> new ProcessSummaryLine(Status.ERROR));
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// on the validate step, we haven't read the full file, so we don't know how many rows there are - thus //
// record count is null, and the ValidateStep won't be setting status counters - so - do it here in that case. //
@ -81,19 +104,153 @@ public class BulkInsertTransformStep extends AbstractTransformStep
{
if(runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_RECORD_COUNT) == null)
{
runBackendStepInput.getAsyncJobCallback().updateStatus("Inserting " + tableLabel + " record " + "%,d".formatted(okSummary.getCount()));
runBackendStepInput.getAsyncJobCallback().updateStatus("Inserting " + table.getLabel() + " record " + "%,d".formatted(okSummary.getCount()));
}
else
{
runBackendStepInput.getAsyncJobCallback().updateStatus("Inserting " + tableLabel + " records");
runBackendStepInput.getAsyncJobCallback().updateStatus("Inserting " + table.getLabel() + " records");
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////
// no transformation needs done - just pass records through from input to output, and assume all are OK //
//////////////////////////////////////////////////////////////////////////////////////////////////////////
runBackendStepOutput.setRecords(runBackendStepInput.getRecords());
okSummary.incrementCount(runBackendStepInput.getRecords().size());
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// no transformation needs to be done - just pass records through from input to output, if they don't violate any UK's //
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////
// if there are no UK's, just output all records //
///////////////////////////////////////////////////
if(existingKeys.isEmpty())
{
runBackendStepOutput.setRecords(runBackendStepInput.getRecords());
okSummary.incrementCount(runBackendStepInput.getRecords().size());
}
else
{
for(UniqueKey uniqueKey : uniqueKeys)
{
keysInThisFile.computeIfAbsent(uniqueKey, x -> new HashSet<>());
}
///////////////////////////////////////////////////////////////////////////
// else, get each records keys and see if it already exists or not //
// also, build a set of keys we've seen (within this page (or overall?)) //
///////////////////////////////////////////////////////////////////////////
for(QRecord record : runBackendStepInput.getRecords())
{
//////////////////////////////////////////////////////////
// check if this record violates any of the unique keys //
//////////////////////////////////////////////////////////
boolean foundDupe = false;
for(UniqueKey uniqueKey : uniqueKeys)
{
List<Serializable> keyValues = getKeyValues(uniqueKey, record);
if(existingKeys.get(uniqueKey).contains(keyValues) || keysInThisFile.get(uniqueKey).contains(keyValues))
{
ukErrorSummaries.get(uniqueKey).incrementCount();
foundDupe = true;
break;
}
}
///////////////////////////////////////////////////////////////////////////////
// if this record doesn't violate any uk's, then we can add it to the output //
///////////////////////////////////////////////////////////////////////////////
if(!foundDupe)
{
for(UniqueKey uniqueKey : uniqueKeys)
{
List<Serializable> keyValues = getKeyValues(uniqueKey, record);
keysInThisFile.get(uniqueKey).add(keyValues);
}
okSummary.incrementCount();
runBackendStepOutput.addRecord(record);
}
}
}
}
/*******************************************************************************
**
*******************************************************************************/
private Set<List<Serializable>> getExistingKeys(RunBackendStepInput runBackendStepInput, UniqueKey uniqueKey) throws QException
{
return (getExistingKeys(runBackendStepInput, uniqueKey.getFieldNames()));
}
/*******************************************************************************
**
*******************************************************************************/
private Set<List<Serializable>> getExistingKeys(RunBackendStepInput runBackendStepInput, List<String> ukFieldNames) throws QException
{
Set<List<Serializable>> existingRecords = new HashSet<>();
if(ukFieldNames != null)
{
QueryInput queryInput = new QueryInput(runBackendStepInput.getInstance());
queryInput.setSession(runBackendStepInput.getSession());
queryInput.setTableName(runBackendStepInput.getTableName());
getTransaction().ifPresent(queryInput::setTransaction);
QQueryFilter filter = new QQueryFilter();
if(ukFieldNames.size() == 1)
{
List<Serializable> values = runBackendStepInput.getRecords().stream()
.map(r -> r.getValue(ukFieldNames.get(0)))
.collect(Collectors.toList());
filter.addCriteria(new QFilterCriteria(ukFieldNames.get(0), QCriteriaOperator.IN, values));
}
else
{
filter.setBooleanOperator(QQueryFilter.BooleanOperator.OR);
for(QRecord record : runBackendStepInput.getRecords())
{
QQueryFilter subFilter = new QQueryFilter();
filter.addSubFilter(subFilter);
for(String fieldName : ukFieldNames)
{
subFilter.addCriteria(new QFilterCriteria(fieldName, QCriteriaOperator.EQUALS, List.of(record.getValue(fieldName))));
}
}
}
queryInput.setFilter(filter);
QueryOutput queryOutput = new QueryAction().execute(queryInput);
for(QRecord record : queryOutput.getRecords())
{
List<Serializable> keyValues = getKeyValues(ukFieldNames, record);
existingRecords.add(keyValues);
}
}
return (existingRecords);
}
/*******************************************************************************
**
*******************************************************************************/
private List<Serializable> getKeyValues(UniqueKey uniqueKey, QRecord record)
{
return (getKeyValues(uniqueKey.getFieldNames(), record));
}
/*******************************************************************************
**
*******************************************************************************/
private List<Serializable> getKeyValues(List<String> fieldNames, QRecord record)
{
List<Serializable> keyValues = new ArrayList<>();
for(String fieldName : fieldNames)
{
keyValues.add(record.getValue(fieldName));
}
return keyValues;
}
@ -104,17 +261,50 @@ public class BulkInsertTransformStep extends AbstractTransformStep
@Override
public ArrayList<ProcessSummaryLineInterface> getProcessSummary(RunBackendStepOutput runBackendStepOutput, boolean isForResultScreen)
{
if(isForResultScreen)
{
okSummary.setMessage(tableLabel + " records were inserted.");
}
else
{
okSummary.setMessage(tableLabel + " records will be inserted.");
}
String tableLabel = table == null ? "" : table.getLabel();
okSummary
.withSingularFutureMessage(tableLabel + " record will be inserted")
.withPluralFutureMessage(tableLabel + " records will be inserted")
.withSingularPastMessage(tableLabel + " record was inserted")
.withPluralPastMessage(tableLabel + " records were inserted");
ArrayList<ProcessSummaryLineInterface> rs = new ArrayList<>();
rs.add(okSummary);
okSummary.addSelfToListIfAnyCount(rs);
for(Map.Entry<UniqueKey, ProcessSummaryLine> entry : ukErrorSummaries.entrySet())
{
UniqueKey uniqueKey = entry.getKey();
ProcessSummaryLine ukErrorSummary = entry.getValue();
String ukErrorSuffix = " inserted, because they contain a duplicate key (" + getUkDescription(uniqueKey.getFieldNames()) + ")";
ukErrorSummary
.withSingularFutureMessage(tableLabel + " record will not be" + ukErrorSuffix)
.withPluralFutureMessage(tableLabel + " records will not be" + ukErrorSuffix)
.withSingularPastMessage(tableLabel + " record was not" + ukErrorSuffix)
.withPluralPastMessage(tableLabel + " records were not" + ukErrorSuffix);
ukErrorSummary.addSelfToListIfAnyCount(rs);
}
return (rs);
}
/*******************************************************************************
**
*******************************************************************************/
private String getUkDescription(List<String> ukFieldNames)
{
List<String> fieldLabels = new ArrayList<>();
for(String fieldName : ukFieldNames)
{
fieldLabels.add(table.getField(fieldName).getLabel());
}
return (StringUtils.joinWithCommasAndAnd(fieldLabels));
}
}