Hotfix - sqs batch mode, and rdbms delete-by-filter

This commit is contained in:
2022-12-15 16:13:56 -06:00
parent 40afb629e7
commit 0e68bf1e72
4 changed files with 156 additions and 16 deletions

View File

@ -22,12 +22,15 @@
package com.kingsrook.qqq.backend.core.actions.queues;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
@ -88,8 +91,13 @@ public class SQSQueuePoller implements Runnable
while(true)
{
///////////////////////////////
// fetch a batch of messages //
///////////////////////////////
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
receiveMessageRequest.setQueueUrl(queueUrl);
receiveMessageRequest.setMaxNumberOfMessages(10);
receiveMessageRequest.setWaitTimeSeconds(20); // help urge SQS to query multiple servers and find more messages
ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest);
if(receiveMessageResult.getMessages().isEmpty())
{
@ -98,31 +106,61 @@ public class SQSQueuePoller implements Runnable
}
LOG.debug(receiveMessageResult.getMessages().size() + " messages received. Processing.");
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
// extract data from the messages into list of bodies to pass into process, and list of delete-batch-inputs //
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
ArrayList<String> bodies = new ArrayList<>();
int i = 0;
for(Message message : receiveMessageResult.getMessages())
{
String body = message.getBody();
bodies.add(message.getBody());
deleteRequestEntries.add(new DeleteMessageBatchRequestEntry(String.valueOf(i++), message.getReceiptHandle()));
}
/////////////////////////////////////////////////////////////////////////////////////
// run the process, in a try-catch, so even if it fails, our loop keeps going. //
// the messages in a failed process will get re-delivered, to try-again, up to the //
// number of times configured in AWS //
/////////////////////////////////////////////////////////////////////////////////////
try
{
RunProcessInput runProcessInput = new RunProcessInput(qInstance);
runProcessInput.setSession(sessionSupplier.get());
runProcessInput.setProcessName(queueMetaData.getProcessName());
runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
runProcessInput.addValue("body", body);
runProcessInput.addValue("bodies", bodies);
RunProcessAction runProcessAction = new RunProcessAction();
RunProcessOutput runProcessOutput = runProcessAction.execute(runProcessInput);
/////////////////////////////////
// todo - what of exceptions?? //
/////////////////////////////////
String receiptHandle = message.getReceiptHandle();
sqs.deleteMessage(new DeleteMessageRequest(queueUrl, receiptHandle));
////////////////////////////////////////////////////////////////////////////////////////////
// if there was an exception returned by the process (e.g., thrown in backend step), then //
// warn and leave the messages for re-processing. //
////////////////////////////////////////////////////////////////////////////////////////////
if(runProcessOutput.getException().isPresent())
{
LOG.warn("Exception returned by process when handling SQS Messages. They will not be deleted from the queue.", runProcessOutput.getException().get());
}
else
{
///////////////////////////////////////////////
// else, if no exception, do a batch delete. //
///////////////////////////////////////////////
sqs.deleteMessageBatch(new DeleteMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(deleteRequestEntries));
}
}
catch(Exception e)
{
LOG.warn("Error receiving SQS Messages.", e);
}
}
}
catch(Exception e)
{
LOG.warn("Error receiving SQS Message", e);
LOG.warn("Error running SQS Queue Poller", e);
}
finally
{

View File

@ -188,7 +188,7 @@ public class RDBMSDeleteAction extends AbstractRDBMSAction implements DeleteInte
String sql = "DELETE FROM "
+ escapeIdentifier(tableName)
+ " WHERE "
+ primaryKeyName + " = ?";
+ escapeIdentifier(primaryKeyName) + " = ?";
try
{
@ -263,7 +263,7 @@ public class RDBMSDeleteAction extends AbstractRDBMSAction implements DeleteInte
// todo sql customization - can edit sql and/or param list?
String sql = "DELETE FROM "
+ tableName
+ escapeIdentifier(tableName) + " AS " + escapeIdentifier(table.getName())
+ " WHERE "
+ whereClause;

View File

@ -26,6 +26,9 @@ import java.util.List;
import java.util.Map;
import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType;
@ -164,9 +167,9 @@ public class RDBMSDeleteActionTest extends RDBMSActionTest
DeleteOutput deleteResult = new RDBMSDeleteAction().execute(deleteInput);
////////////////////////////////////////////////////////////////////////////////
// assert that 6 queries ran - the initial delete (which failed), then 6 more //
////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////////////
// assert that 6 queries ran - the initial delete (which failed), then 5 more deletes //
////////////////////////////////////////////////////////////////////////////////////////
QueryManager.setCollectStatistics(false);
Map<String, Integer> queryStats = QueryManager.getStatistics();
assertEquals(6, queryStats.get(QueryManager.STAT_QUERIES_RAN), "Number of queries ran");
@ -191,6 +194,104 @@ public class RDBMSDeleteActionTest extends RDBMSActionTest
/*******************************************************************************
**
*******************************************************************************/
@Test
public void testDeleteByFilterThatJustWorks() throws Exception
{
//////////////////////////////////////////////////////////////////
// load the parent-child tables, with foreign keys and instance //
//////////////////////////////////////////////////////////////////
TestUtils.primeTestDatabase("prime-test-database-parent-child-tables.sql");
DeleteInput deleteInput = initChildTableInstanceAndDeleteRequest();
////////////////////////////////////////////////////////////////////////
// try to delete the records without a foreign key that'll block them //
////////////////////////////////////////////////////////////////////////
deleteInput.setQueryFilter(new QQueryFilter(new QFilterCriteria("id", QCriteriaOperator.IN, List.of(2, 4, 5))));
QueryManager.setCollectStatistics(true);
QueryManager.resetStatistics();
DeleteOutput deleteResult = new RDBMSDeleteAction().execute(deleteInput);
//////////////////////////////////
// assert that just 1 query ran //
//////////////////////////////////
QueryManager.setCollectStatistics(false);
Map<String, Integer> queryStats = QueryManager.getStatistics();
assertEquals(1, queryStats.get(QueryManager.STAT_QUERIES_RAN), "Number of queries ran");
assertEquals(3, deleteResult.getDeletedRecordCount(), "Should get back that 3 were deleted");
runTestSql("SELECT id FROM child_table", (rs -> {
int rowsFound = 0;
while(rs.next())
{
rowsFound++;
///////////////////////////////////////////
// child_table rows 1 & 3 should survive //
///////////////////////////////////////////
assertTrue(rs.getInt(1) == 1 || rs.getInt(1) == 3);
}
assertEquals(2, rowsFound);
}));
}
/*******************************************************************************
**
*******************************************************************************/
@Test
public void testDeleteByFilterWhereForeignKeyBlocksSome() throws Exception
{
//////////////////////////////////////////////////////////////////
// load the parent-child tables, with foreign keys and instance //
//////////////////////////////////////////////////////////////////
TestUtils.primeTestDatabase("prime-test-database-parent-child-tables.sql");
DeleteInput deleteInput = initChildTableInstanceAndDeleteRequest();
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// try to delete all of the child records - 2 should fail, because they are referenced by parent_table.child_id //
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
deleteInput.setQueryFilter(new QQueryFilter(new QFilterCriteria("id", QCriteriaOperator.IN, List.of(1, 2, 3, 4, 5))));
QueryManager.setCollectStatistics(true);
QueryManager.resetStatistics();
DeleteOutput deleteResult = new RDBMSDeleteAction().execute(deleteInput);
///////////////////////////////////////////////////////////////////////////////////////////////////////
// assert that 8 queries ran - the initial delete (which failed), then 1 to look up the ids //
// from that query, another to try to delete all those ids (also fails), and finally 5 deletes by id //
// todo - maybe we shouldn't do that 2nd "try to delete 'em all by id"... why would it ever work, //
// but the original filter query didn't (other than malformed SQL)? //
///////////////////////////////////////////////////////////////////////////////////////////////////////
QueryManager.setCollectStatistics(false);
Map<String, Integer> queryStats = QueryManager.getStatistics();
assertEquals(8, queryStats.get(QueryManager.STAT_QUERIES_RAN), "Number of queries ran");
assertEquals(2, deleteResult.getRecordsWithErrors().size(), "Should get back the 2 records with errors");
assertTrue(deleteResult.getRecordsWithErrors().stream().noneMatch(r -> r.getErrors().isEmpty()), "All we got back should have errors");
assertEquals(3, deleteResult.getDeletedRecordCount(), "Should get back that 3 were deleted");
runTestSql("SELECT id FROM child_table", (rs -> {
int rowsFound = 0;
while(rs.next())
{
rowsFound++;
///////////////////////////////////////////
// child_table rows 1 & 3 should survive //
///////////////////////////////////////////
assertTrue(rs.getInt(1) == 1 || rs.getInt(1) == 3);
}
assertEquals(2, rowsFound);
}));
}
/*******************************************************************************
**
*******************************************************************************/

View File

@ -19,7 +19,9 @@
-- along with this program. If not, see <https://www.gnu.org/licenses/>.
--
DROP TABLE IF EXISTS parent_table;
DROP TABLE IF EXISTS child_table;
CREATE TABLE child_table
(
id INT AUTO_INCREMENT primary key,
@ -32,7 +34,6 @@ INSERT INTO child_table (id, name) VALUES (3, 'Johnny');
INSERT INTO child_table (id, name) VALUES (4, 'Gracie');
INSERT INTO child_table (id, name) VALUES (5, 'Suzie');
DROP TABLE IF EXISTS parent_table;
CREATE TABLE parent_table
(
id INT AUTO_INCREMENT PRIMARY KEY,