From 0e68bf1e72df5f98514cb2f131c6e4a41e0714fa Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Thu, 15 Dec 2022 16:13:56 -0600 Subject: [PATCH] Hotfix - sqs batch mode, and rdbms delete-by-filter --- .../core/actions/queues/SQSQueuePoller.java | 58 ++++++++-- .../rdbms/actions/RDBMSDeleteAction.java | 4 +- .../rdbms/actions/RDBMSDeleteActionTest.java | 107 +++++++++++++++++- ...rime-test-database-parent-child-tables.sql | 3 +- 4 files changed, 156 insertions(+), 16 deletions(-) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePoller.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePoller.java index b7349432..efb24d22 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePoller.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePoller.java @@ -22,12 +22,15 @@ package com.kingsrook.qqq.backend.core.actions.queues; +import java.util.ArrayList; +import java.util.List; import java.util.function.Supplier; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.DeleteMessageRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest; +import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry; import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.ReceiveMessageRequest; import com.amazonaws.services.sqs.model.ReceiveMessageResult; @@ -88,8 +91,13 @@ public class SQSQueuePoller implements Runnable while(true) { + /////////////////////////////// + // fetch a batch of messages // + /////////////////////////////// ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(); receiveMessageRequest.setQueueUrl(queueUrl); + receiveMessageRequest.setMaxNumberOfMessages(10); + receiveMessageRequest.setWaitTimeSeconds(20); // help urge SQS to query multiple servers and find more messages ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); if(receiveMessageResult.getMessages().isEmpty()) { @@ -98,31 +106,61 @@ public class SQSQueuePoller implements Runnable } LOG.debug(receiveMessageResult.getMessages().size() + " messages received. Processing."); + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // extract data from the messages into list of bodies to pass into process, and list of delete-batch-inputs // + ////////////////////////////////////////////////////////////////////////////////////////////////////////////// + List deleteRequestEntries = new ArrayList<>(); + ArrayList bodies = new ArrayList<>(); + int i = 0; for(Message message : receiveMessageResult.getMessages()) { - String body = message.getBody(); + bodies.add(message.getBody()); + deleteRequestEntries.add(new DeleteMessageBatchRequestEntry(String.valueOf(i++), message.getReceiptHandle())); + } + ///////////////////////////////////////////////////////////////////////////////////// + // run the process, in a try-catch, so even if it fails, our loop keeps going. // + // the messages in a failed process will get re-delivered, to try-again, up to the // + // number of times configured in AWS // + ///////////////////////////////////////////////////////////////////////////////////// + try + { RunProcessInput runProcessInput = new RunProcessInput(qInstance); runProcessInput.setSession(sessionSupplier.get()); runProcessInput.setProcessName(queueMetaData.getProcessName()); runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP); - runProcessInput.addValue("body", body); + runProcessInput.addValue("bodies", bodies); RunProcessAction runProcessAction = new RunProcessAction(); RunProcessOutput runProcessOutput = runProcessAction.execute(runProcessInput); - ///////////////////////////////// - // todo - what of exceptions?? // - ///////////////////////////////// - - String receiptHandle = message.getReceiptHandle(); - sqs.deleteMessage(new DeleteMessageRequest(queueUrl, receiptHandle)); + //////////////////////////////////////////////////////////////////////////////////////////// + // if there was an exception returned by the process (e.g., thrown in backend step), then // + // warn and leave the messages for re-processing. // + //////////////////////////////////////////////////////////////////////////////////////////// + if(runProcessOutput.getException().isPresent()) + { + LOG.warn("Exception returned by process when handling SQS Messages. They will not be deleted from the queue.", runProcessOutput.getException().get()); + } + else + { + /////////////////////////////////////////////// + // else, if no exception, do a batch delete. // + /////////////////////////////////////////////// + sqs.deleteMessageBatch(new DeleteMessageBatchRequest() + .withQueueUrl(queueUrl) + .withEntries(deleteRequestEntries)); + } + } + catch(Exception e) + { + LOG.warn("Error receiving SQS Messages.", e); } } } catch(Exception e) { - LOG.warn("Error receiving SQS Message", e); + LOG.warn("Error running SQS Queue Poller", e); } finally { diff --git a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSDeleteAction.java b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSDeleteAction.java index 36efffa4..5de4634b 100644 --- a/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSDeleteAction.java +++ b/qqq-backend-module-rdbms/src/main/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSDeleteAction.java @@ -188,7 +188,7 @@ public class RDBMSDeleteAction extends AbstractRDBMSAction implements DeleteInte String sql = "DELETE FROM " + escapeIdentifier(tableName) + " WHERE " - + primaryKeyName + " = ?"; + + escapeIdentifier(primaryKeyName) + " = ?"; try { @@ -263,7 +263,7 @@ public class RDBMSDeleteAction extends AbstractRDBMSAction implements DeleteInte // todo sql customization - can edit sql and/or param list? String sql = "DELETE FROM " - + tableName + + escapeIdentifier(tableName) + " AS " + escapeIdentifier(table.getName()) + " WHERE " + whereClause; diff --git a/qqq-backend-module-rdbms/src/test/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSDeleteActionTest.java b/qqq-backend-module-rdbms/src/test/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSDeleteActionTest.java index d006bf4c..8f7ee125 100644 --- a/qqq-backend-module-rdbms/src/test/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSDeleteActionTest.java +++ b/qqq-backend-module-rdbms/src/test/java/com/kingsrook/qqq/backend/module/rdbms/actions/RDBMSDeleteActionTest.java @@ -26,6 +26,9 @@ import java.util.List; import java.util.Map; import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteInput; import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteOutput; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria; +import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter; import com.kingsrook.qqq.backend.core.model.metadata.QInstance; import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData; import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldType; @@ -164,9 +167,9 @@ public class RDBMSDeleteActionTest extends RDBMSActionTest DeleteOutput deleteResult = new RDBMSDeleteAction().execute(deleteInput); - //////////////////////////////////////////////////////////////////////////////// - // assert that 6 queries ran - the initial delete (which failed), then 6 more // - //////////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////////////////// + // assert that 6 queries ran - the initial delete (which failed), then 5 more deletes // + //////////////////////////////////////////////////////////////////////////////////////// QueryManager.setCollectStatistics(false); Map queryStats = QueryManager.getStatistics(); assertEquals(6, queryStats.get(QueryManager.STAT_QUERIES_RAN), "Number of queries ran"); @@ -191,6 +194,104 @@ public class RDBMSDeleteActionTest extends RDBMSActionTest + /******************************************************************************* + ** + *******************************************************************************/ + @Test + public void testDeleteByFilterThatJustWorks() throws Exception + { + ////////////////////////////////////////////////////////////////// + // load the parent-child tables, with foreign keys and instance // + ////////////////////////////////////////////////////////////////// + TestUtils.primeTestDatabase("prime-test-database-parent-child-tables.sql"); + DeleteInput deleteInput = initChildTableInstanceAndDeleteRequest(); + + //////////////////////////////////////////////////////////////////////// + // try to delete the records without a foreign key that'll block them // + //////////////////////////////////////////////////////////////////////// + deleteInput.setQueryFilter(new QQueryFilter(new QFilterCriteria("id", QCriteriaOperator.IN, List.of(2, 4, 5)))); + + QueryManager.setCollectStatistics(true); + QueryManager.resetStatistics(); + + DeleteOutput deleteResult = new RDBMSDeleteAction().execute(deleteInput); + + ////////////////////////////////// + // assert that just 1 query ran // + ////////////////////////////////// + QueryManager.setCollectStatistics(false); + Map queryStats = QueryManager.getStatistics(); + assertEquals(1, queryStats.get(QueryManager.STAT_QUERIES_RAN), "Number of queries ran"); + assertEquals(3, deleteResult.getDeletedRecordCount(), "Should get back that 3 were deleted"); + + runTestSql("SELECT id FROM child_table", (rs -> { + int rowsFound = 0; + while(rs.next()) + { + rowsFound++; + /////////////////////////////////////////// + // child_table rows 1 & 3 should survive // + /////////////////////////////////////////// + assertTrue(rs.getInt(1) == 1 || rs.getInt(1) == 3); + } + assertEquals(2, rowsFound); + })); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + @Test + public void testDeleteByFilterWhereForeignKeyBlocksSome() throws Exception + { + ////////////////////////////////////////////////////////////////// + // load the parent-child tables, with foreign keys and instance // + ////////////////////////////////////////////////////////////////// + TestUtils.primeTestDatabase("prime-test-database-parent-child-tables.sql"); + DeleteInput deleteInput = initChildTableInstanceAndDeleteRequest(); + + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // try to delete all of the child records - 2 should fail, because they are referenced by parent_table.child_id // + ////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + deleteInput.setQueryFilter(new QQueryFilter(new QFilterCriteria("id", QCriteriaOperator.IN, List.of(1, 2, 3, 4, 5)))); + + QueryManager.setCollectStatistics(true); + QueryManager.resetStatistics(); + + DeleteOutput deleteResult = new RDBMSDeleteAction().execute(deleteInput); + + /////////////////////////////////////////////////////////////////////////////////////////////////////// + // assert that 8 queries ran - the initial delete (which failed), then 1 to look up the ids // + // from that query, another to try to delete all those ids (also fails), and finally 5 deletes by id // + // todo - maybe we shouldn't do that 2nd "try to delete 'em all by id"... why would it ever work, // + // but the original filter query didn't (other than malformed SQL)? // + /////////////////////////////////////////////////////////////////////////////////////////////////////// + QueryManager.setCollectStatistics(false); + Map queryStats = QueryManager.getStatistics(); + assertEquals(8, queryStats.get(QueryManager.STAT_QUERIES_RAN), "Number of queries ran"); + + assertEquals(2, deleteResult.getRecordsWithErrors().size(), "Should get back the 2 records with errors"); + assertTrue(deleteResult.getRecordsWithErrors().stream().noneMatch(r -> r.getErrors().isEmpty()), "All we got back should have errors"); + assertEquals(3, deleteResult.getDeletedRecordCount(), "Should get back that 3 were deleted"); + + runTestSql("SELECT id FROM child_table", (rs -> { + int rowsFound = 0; + while(rs.next()) + { + rowsFound++; + /////////////////////////////////////////// + // child_table rows 1 & 3 should survive // + /////////////////////////////////////////// + assertTrue(rs.getInt(1) == 1 || rs.getInt(1) == 3); + } + assertEquals(2, rowsFound); + })); + } + + + /******************************************************************************* ** *******************************************************************************/ diff --git a/qqq-backend-module-rdbms/src/test/resources/prime-test-database-parent-child-tables.sql b/qqq-backend-module-rdbms/src/test/resources/prime-test-database-parent-child-tables.sql index 7acb63a0..294a399f 100644 --- a/qqq-backend-module-rdbms/src/test/resources/prime-test-database-parent-child-tables.sql +++ b/qqq-backend-module-rdbms/src/test/resources/prime-test-database-parent-child-tables.sql @@ -19,7 +19,9 @@ -- along with this program. If not, see . -- +DROP TABLE IF EXISTS parent_table; DROP TABLE IF EXISTS child_table; + CREATE TABLE child_table ( id INT AUTO_INCREMENT primary key, @@ -32,7 +34,6 @@ INSERT INTO child_table (id, name) VALUES (3, 'Johnny'); INSERT INTO child_table (id, name) VALUES (4, 'Gracie'); INSERT INTO child_table (id, name) VALUES (5, 'Suzie'); -DROP TABLE IF EXISTS parent_table; CREATE TABLE parent_table ( id INT AUTO_INCREMENT PRIMARY KEY,