Add primary keys to process summary lines (and thus traces) for bulk load; better handling of errors and warnings also from bulk insert result step

This commit is contained in:
2025-04-25 16:05:54 -05:00
parent f81b257dd4
commit e5987238e6
5 changed files with 279 additions and 7 deletions

View File

@ -41,6 +41,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.LoadViaInsertStep;
import com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend.ProcessSummaryProviderInterface;
import com.kingsrook.qqq.backend.core.processes.implementations.general.ProcessSummaryWarningsAndErrorsRollup;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
@ -77,19 +78,77 @@ public class BulkInsertLoadStep extends LoadViaInsertStep implements ProcessSumm
QTableMetaData table = QContext.getQInstance().getTable(runBackendStepInput.getValueString("tableName"));
/////////////////////////////////////////////////////////////////////////////////////////////
// the transform step builds summary lines that it predicts will insert successfully. //
// but those lines don't have ids, which we'd like to have (e.g., for a process trace that //
// might link to the built record). also, it's possible that there was a fail that only //
// happened in the actual insert, so, basically, re-do the summary here //
/////////////////////////////////////////////////////////////////////////////////////////////
BulkInsertTransformStep transformStep = (BulkInsertTransformStep) getTransformStep();
ProcessSummaryLine okSummary = transformStep.okSummary;
okSummary.setCount(0);
okSummary.setPrimaryKeys(new ArrayList<>());
//////////////////////////////////////////////////////////////////////////////////////////////////////////
// but - since errors from the transform step don't even make it through to us here in the load step, //
// do re-use the ProcessSummaryWarningsAndErrorsRollup from transform step as follows: //
// clear out its warnings - we'll completely rebuild them here (with primary keys) //
// and add new error lines, e.g., in case of errors that only happened past the validation if possible. //
//////////////////////////////////////////////////////////////////////////////////////////////////////////
ProcessSummaryWarningsAndErrorsRollup processSummaryWarningsAndErrorsRollup = transformStep.processSummaryWarningsAndErrorsRollup;
processSummaryWarningsAndErrorsRollup.resetWarnings();
List<QRecord> insertedRecords = runBackendStepOutput.getRecords();
for(QRecord insertedRecord : insertedRecords)
{
if(CollectionUtils.nullSafeIsEmpty(insertedRecord.getErrors()))
Serializable primaryKey = insertedRecord.getValue(table.getPrimaryKeyField());
if(CollectionUtils.nullSafeIsEmpty(insertedRecord.getErrors()) && primaryKey != null)
{
/////////////////////////////////////////////////////////////////////////
// if the record had no errors, and we have a primary key for it, then //
// keep track of the range of primary keys (first and last) //
/////////////////////////////////////////////////////////////////////////
if(firstInsertedPrimaryKey == null)
{
firstInsertedPrimaryKey = insertedRecord.getValue(table.getPrimaryKeyField());
firstInsertedPrimaryKey = primaryKey;
}
lastInsertedPrimaryKey = insertedRecord.getValue(table.getPrimaryKeyField());
lastInsertedPrimaryKey = primaryKey;
if(!CollectionUtils.nullSafeIsEmpty(insertedRecord.getWarnings()))
{
/////////////////////////////////////////////////////////////////////////////
// if there were warnings on the inserted record, put it in a warning line //
/////////////////////////////////////////////////////////////////////////////
String message = insertedRecord.getWarnings().get(0).getMessage();
processSummaryWarningsAndErrorsRollup.addWarning(message, primaryKey);
}
else
{
////////////////////////////////////////////////////////////////////////
// if no warnings for the inserted record, then put it in the OK line //
////////////////////////////////////////////////////////////////////////
okSummary.incrementCountAndAddPrimaryKey(primaryKey);
}
}
else
{
//////////////////////////////////////////////////////////////////////
// else if there were errors or no primary key, build an error line //
//////////////////////////////////////////////////////////////////////
String message = "Failed to insert";
if(!CollectionUtils.nullSafeIsEmpty(insertedRecord.getErrors()))
{
//////////////////////////////////////////////////////////
// use the error message from the record if we have one //
//////////////////////////////////////////////////////////
message = insertedRecord.getErrors().get(0).getMessage();
}
processSummaryWarningsAndErrorsRollup.addError(message, primaryKey);
}
}
okSummary.pickMessage(true);
}

View File

@ -75,9 +75,9 @@ import com.kingsrook.qqq.backend.core.utils.ValueUtils;
*******************************************************************************/
public class BulkInsertTransformStep extends AbstractTransformStep
{
private ProcessSummaryLine okSummary = new ProcessSummaryLine(Status.OK);
ProcessSummaryLine okSummary = new ProcessSummaryLine(Status.OK);
private ProcessSummaryWarningsAndErrorsRollup processSummaryWarningsAndErrorsRollup = ProcessSummaryWarningsAndErrorsRollup.build("inserted")
ProcessSummaryWarningsAndErrorsRollup processSummaryWarningsAndErrorsRollup = ProcessSummaryWarningsAndErrorsRollup.build("inserted")
.withDoReplaceSingletonCountLinesWithSuffixOnly(false);
private ListingHash<String, RowValue> errorToExampleRowValueMap = new ListingHash<>();

View File

@ -195,7 +195,7 @@ public class ProcessSummaryWarningsAndErrorsRollup
{
if(otherWarningsSummary == null)
{
otherWarningsSummary = new ProcessSummaryLine(Status.WARNING).withMessageSuffix("records had an other warning.");
otherWarningsSummary = buildOtherWarningsSummary();
}
processSummaryLine = otherWarningsSummary;
}
@ -214,6 +214,27 @@ public class ProcessSummaryWarningsAndErrorsRollup
/***************************************************************************
**
***************************************************************************/
private static ProcessSummaryLine buildOtherWarningsSummary()
{
return new ProcessSummaryLine(Status.WARNING).withMessageSuffix("records had an other warning.");
}
/***************************************************************************
**
***************************************************************************/
public void resetWarnings()
{
warningSummaries.clear();
otherWarningsSummary = buildOtherWarningsSummary();
}
/*******************************************************************************
** Wrapper around AlphaNumericComparator for ProcessSummaryLineInterface that
** extracts string messages out.

View File

@ -185,4 +185,13 @@ public class ProcessSummaryLineInterfaceAssert extends AbstractAssert<ProcessSum
return (this);
}
/***************************************************************************
**
***************************************************************************/
public ProcessSummaryLineInterface getLine()
{
return actual;
}
}

View File

@ -29,15 +29,26 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import com.kingsrook.qqq.backend.core.BaseTest;
import com.kingsrook.qqq.backend.core.actions.customizers.AbstractPreInsertCustomizer;
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizerInterface;
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizers;
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
import com.kingsrook.qqq.backend.core.actions.tables.StorageAction;
import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryAssert;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLine;
import com.kingsrook.qqq.backend.core.model.actions.processes.ProcessSummaryLineInterface;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput;
import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput;
import com.kingsrook.qqq.backend.core.model.actions.processes.Status;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.storage.StorageInput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.code.QCodeReference;
import com.kingsrook.qqq.backend.core.model.metadata.frontend.QFrontendFieldMetaData;
import com.kingsrook.qqq.backend.core.model.statusmessages.BadInputStatusMessage;
import com.kingsrook.qqq.backend.core.model.statusmessages.QWarningMessage;
import com.kingsrook.qqq.backend.core.modules.backend.implementations.memory.MemoryRecordStore;
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.model.BulkLoadProfile;
import com.kingsrook.qqq.backend.core.processes.implementations.bulk.insert.model.BulkLoadProfileField;
@ -176,7 +187,15 @@ class BulkInsertFullProcessTest extends BaseTest
assertThat(runProcessOutput.getValues().get(StreamedETLWithFrontendProcess.FIELD_PROCESS_SUMMARY)).isNotNull().isInstanceOf(List.class);
assertThat(runProcessOutput.getException()).isEmpty();
ProcessSummaryAssert.assertThat(runProcessOutput).hasLineWithMessageContaining("Inserted Id values between 1 and 2");
ProcessSummaryLineInterface okLine = ProcessSummaryAssert.assertThat(runProcessOutput)
.hasLineWithMessageContaining("Person Memory records were inserted")
.hasStatus(Status.OK)
.hasCount(2)
.getLine();
assertEquals(List.of(1, 2), ((ProcessSummaryLine) okLine).getPrimaryKeys());
ProcessSummaryAssert.assertThat(runProcessOutput).hasLineWithMessageContaining("records were processed from the file").hasStatus(Status.INFO);
ProcessSummaryAssert.assertThat(runProcessOutput).hasLineWithMessageContaining("Inserted Id values between 1 and 2").hasStatus(Status.INFO);
////////////////////////////////////
// query for the inserted records //
@ -201,6 +220,86 @@ class BulkInsertFullProcessTest extends BaseTest
/*******************************************************************************
**
*******************************************************************************/
@Test
void testSummaryLinePrimaryKeys() throws Exception
{
assertThat(TestUtils.queryTable(TestUtils.TABLE_NAME_PERSON_MEMORY)).isEmpty();
QContext.getQInstance().getTable(TestUtils.TABLE_NAME_PERSON_MEMORY)
.withCustomizer(TableCustomizers.PRE_INSERT_RECORD, new QCodeReference(PersonWarnOrErrorCustomizer.class));
/////////////////////////////////////////////////////////
// start the process - expect to go to the upload step //
/////////////////////////////////////////////////////////
RunProcessInput runProcessInput = new RunProcessInput();
RunProcessOutput runProcessOutput = startProcess(runProcessInput);
String processUUID = runProcessOutput.getProcessUUID();
continueProcessPostUpload(runProcessInput, processUUID, simulateFileUploadForWarningCase());
continueProcessPostFileMapping(runProcessInput);
continueProcessPostValueMapping(runProcessInput);
runProcessOutput = continueProcessPostReviewScreen(runProcessInput);
ProcessSummaryLineInterface okLine = ProcessSummaryAssert.assertThat(runProcessOutput)
.hasLineWithMessageContaining("Person Memory record was inserted")
.hasStatus(Status.OK)
.hasCount(1)
.getLine();
assertEquals(List.of(1), ((ProcessSummaryLine) okLine).getPrimaryKeys());
ProcessSummaryLineInterface warnTornadoLine = ProcessSummaryAssert.assertThat(runProcessOutput)
.hasLineWithMessageContaining("records were inserted, but had a warning: Tornado warning")
.hasStatus(Status.WARNING)
.hasCount(2)
.getLine();
assertEquals(List.of(2, 3), ((ProcessSummaryLine) warnTornadoLine).getPrimaryKeys());
ProcessSummaryAssert.assertThat(runProcessOutput).hasLineWithMessageContaining("record was inserted, but had a warning: Hurricane warning").hasStatus(Status.WARNING).hasCount(1);
ProcessSummaryAssert.assertThat(runProcessOutput).hasLineWithMessageContaining("records were processed from the file").hasStatus(Status.INFO).hasCount(4);
ProcessSummaryAssert.assertThat(runProcessOutput).hasLineWithMessageContaining("Inserted Id values between 1 and 4").hasStatus(Status.INFO);
}
/*******************************************************************************
**
*******************************************************************************/
@Test
void testSummaryLineErrors() throws Exception
{
assertThat(TestUtils.queryTable(TestUtils.TABLE_NAME_PERSON_MEMORY)).isEmpty();
QContext.getQInstance().getTable(TestUtils.TABLE_NAME_PERSON_MEMORY)
.withCustomizer(TableCustomizers.PRE_INSERT_RECORD, new QCodeReference(PersonWarnOrErrorCustomizer.class));
/////////////////////////////////////////////////////////
// start the process - expect to go to the upload step //
/////////////////////////////////////////////////////////
RunProcessInput runProcessInput = new RunProcessInput();
RunProcessOutput runProcessOutput = startProcess(runProcessInput);
String processUUID = runProcessOutput.getProcessUUID();
continueProcessPostUpload(runProcessInput, processUUID, simulateFileUploadForErrorCase());
continueProcessPostFileMapping(runProcessInput);
continueProcessPostValueMapping(runProcessInput);
runProcessOutput = continueProcessPostReviewScreen(runProcessInput);
ProcessSummaryAssert.assertThat(runProcessOutput).hasLineWithMessageContaining("Person Memory record was inserted.").hasStatus(Status.OK).hasCount(1);
ProcessSummaryAssert.assertThat(runProcessOutput)
.hasLineWithMessageContaining("plane")
.hasStatus(Status.ERROR)
.hasCount(1);
ProcessSummaryAssert.assertThat(runProcessOutput)
.hasLineWithMessageContaining("purifier")
.hasStatus(Status.ERROR)
.hasCount(1);
}
/*******************************************************************************
**
*******************************************************************************/
@ -301,6 +400,47 @@ class BulkInsertFullProcessTest extends BaseTest
/***************************************************************************
**
***************************************************************************/
private static StorageInput simulateFileUploadForWarningCase() throws Exception
{
String storageReference = UUID.randomUUID() + ".csv";
StorageInput storageInput = new StorageInput(TestUtils.TABLE_NAME_MEMORY_STORAGE).withReference(storageReference);
try(OutputStream outputStream = new StorageAction().createOutputStream(storageInput))
{
outputStream.write((getPersonCsvHeaderUsingLabels() + """
"0","2021-10-26 14:39:37","2021-10-26 14:39:37","John","Doe","1980-01-01","john@doe.com","Missouri",42
"0","2021-10-26 14:39:37","2021-10-26 14:39:37","Tornado warning","Doe","1980-01-01","john@doe.com","Missouri",42
"0","2021-10-26 14:39:37","2021-10-26 14:39:37","Tornado warning","Doey","1980-01-01","john@doe.com","Missouri",42
"0","2021-10-26 14:39:37","2021-10-26 14:39:37","Hurricane warning","Doe","1980-01-01","john@doe.com","Missouri",42
""").getBytes());
}
return storageInput;
}
/***************************************************************************
**
***************************************************************************/
private static StorageInput simulateFileUploadForErrorCase() throws Exception
{
String storageReference = UUID.randomUUID() + ".csv";
StorageInput storageInput = new StorageInput(TestUtils.TABLE_NAME_MEMORY_STORAGE).withReference(storageReference);
try(OutputStream outputStream = new StorageAction().createOutputStream(storageInput))
{
outputStream.write((getPersonCsvHeaderUsingLabels() + """
"0","2021-10-26 14:39:37","2021-10-26 14:39:37","John","Doe","1980-01-01","john@doe.com","Missouri",42
"0","2021-10-26 14:39:37","2021-10-26 14:39:37","not-pre-Error plane","Doe","1980-01-01","john@doe.com","Missouri",42
"0","2021-10-26 14:39:37","2021-10-26 14:39:37","Error purifier","Doe","1980-01-01","john@doe.com","Missouri",42
""").getBytes());
}
return storageInput;
}
/***************************************************************************
**
***************************************************************************/
@ -331,4 +471,47 @@ class BulkInsertFullProcessTest extends BaseTest
)));
}
/***************************************************************************
**
***************************************************************************/
public static class PersonWarnOrErrorCustomizer implements TableCustomizerInterface
{
/***************************************************************************
**
***************************************************************************/
@Override
public AbstractPreInsertCustomizer.WhenToRun whenToRunPreInsert(InsertInput insertInput, boolean isPreview)
{
return AbstractPreInsertCustomizer.WhenToRun.BEFORE_ALL_VALIDATIONS;
}
/***************************************************************************
**
***************************************************************************/
@Override
public List<QRecord> preInsert(InsertInput insertInput, List<QRecord> records, boolean isPreview) throws QException
{
for(QRecord record : records)
{
if(record.getValueString("firstName").toLowerCase().contains("warn"))
{
record.addWarning(new QWarningMessage(record.getValueString("firstName")));
}
else if(record.getValueString("firstName").toLowerCase().contains("error"))
{
if(isPreview && record.getValueString("firstName").toLowerCase().contains("not-pre-error"))
{
continue;
}
record.addError(new BadInputStatusMessage(record.getValueString("firstName")));
}
}
return records;
}
}
}