QQQ-37 Redo bulk processes in streamed-etl mode

This commit is contained in:
2022-09-07 16:59:42 -05:00
parent 1c75df3a09
commit b01758879c
27 changed files with 1430 additions and 1036 deletions

View File

@ -75,33 +75,56 @@ public class RDBMSDeleteAction extends AbstractRDBMSAction implements DeleteInte
// - else if there's a list, try to delete it, but upon error: //
// - - do a single-delete for each entry in the list. //
/////////////////////////////////////////////////////////////////////////////////
try(Connection connection = getConnection(deleteInput))
try
{
///////////////////////////////////////////////////////////////////////////////////////////////
// if there's a query filter, try to do a single-delete with that filter in the WHERE clause //
///////////////////////////////////////////////////////////////////////////////////////////////
if(deleteInput.getQueryFilter() != null)
Connection connection;
boolean needToCloseConnection = false;
if(deleteInput.getTransaction() != null && deleteInput.getTransaction() instanceof RDBMSTransaction rdbmsTransaction)
{
try
{
deleteInput.getAsyncJobCallback().updateStatus("Running bulk delete via query filter.");
deleteQueryFilter(connection, deleteInput, deleteOutput);
return (deleteOutput);
}
catch(Exception e)
{
deleteInput.getAsyncJobCallback().updateStatus("Error running bulk delete via filter. Fetching keys for individual deletes.");
LOG.info("Exception trying to delete by filter query. Moving on to deleting by id now.");
deleteInput.setPrimaryKeys(DeleteAction.getPrimaryKeysFromQueryFilter(deleteInput));
}
LOG.debug("Using connection from updateInput [" + rdbmsTransaction.getConnection() + "]");
connection = rdbmsTransaction.getConnection();
}
else
{
connection = getConnection(deleteInput);
needToCloseConnection = true;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// at this point, there either wasn't a query filter, or there was an error executing it (in which case, the query should //
// have been converted to a list of primary keys in the deleteInput). so, proceed now by deleting a list of pkeys. //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
deleteList(connection, deleteInput, deleteOutput);
return (deleteOutput);
try
{
///////////////////////////////////////////////////////////////////////////////////////////////
// if there's a query filter, try to do a single-delete with that filter in the WHERE clause //
///////////////////////////////////////////////////////////////////////////////////////////////
if(deleteInput.getQueryFilter() != null)
{
try
{
deleteInput.getAsyncJobCallback().updateStatus("Running bulk delete via query filter.");
deleteQueryFilter(connection, deleteInput, deleteOutput);
return (deleteOutput);
}
catch(Exception e)
{
deleteInput.getAsyncJobCallback().updateStatus("Error running bulk delete via filter. Fetching keys for individual deletes.");
LOG.info("Exception trying to delete by filter query. Moving on to deleting by id now.");
deleteInput.setPrimaryKeys(DeleteAction.getPrimaryKeysFromQueryFilter(deleteInput));
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// at this point, there either wasn't a query filter, or there was an error executing it (in which case, the query should //
// have been converted to a list of primary keys in the deleteInput). so, proceed now by deleting a list of pkeys. //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
deleteList(connection, deleteInput, deleteOutput);
return (deleteOutput);
}
finally
{
if(needToCloseConnection)
{
connection.close();
}
}
}
catch(Exception e)
{
@ -117,6 +140,13 @@ public class RDBMSDeleteAction extends AbstractRDBMSAction implements DeleteInte
private void deleteList(Connection connection, DeleteInput deleteInput, DeleteOutput deleteOutput)
{
List<Serializable> primaryKeys = deleteInput.getPrimaryKeys();
if(primaryKeys.size() == 0)
{
/////////////////////////
// noop - just return. //
/////////////////////////
return;
}
if(primaryKeys.size() == 1)
{
doDeleteOne(connection, deleteInput.getTable(), primaryKeys.get(0), deleteOutput);