mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 05:01:07 +00:00
Fixes for performance of sqs (batch mode), plus bug in deleteAction by query-filter (with test that proves it)
This commit is contained in:
@ -22,12 +22,15 @@
|
||||
package com.kingsrook.qqq.backend.core.actions.queues;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import com.amazonaws.auth.AWSStaticCredentialsProvider;
|
||||
import com.amazonaws.auth.BasicAWSCredentials;
|
||||
import com.amazonaws.services.sqs.AmazonSQS;
|
||||
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
|
||||
import com.amazonaws.services.sqs.model.DeleteMessageRequest;
|
||||
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
|
||||
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
|
||||
import com.amazonaws.services.sqs.model.Message;
|
||||
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
|
||||
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
|
||||
@ -88,8 +91,13 @@ public class SQSQueuePoller implements Runnable
|
||||
|
||||
while(true)
|
||||
{
|
||||
///////////////////////////////
|
||||
// fetch a batch of messages //
|
||||
///////////////////////////////
|
||||
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest();
|
||||
receiveMessageRequest.setQueueUrl(queueUrl);
|
||||
receiveMessageRequest.setMaxNumberOfMessages(10);
|
||||
receiveMessageRequest.setWaitTimeSeconds(20); // help urge SQS to query multiple servers and find more messages
|
||||
ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest);
|
||||
if(receiveMessageResult.getMessages().isEmpty())
|
||||
{
|
||||
@ -98,31 +106,61 @@ public class SQSQueuePoller implements Runnable
|
||||
}
|
||||
LOG.debug(receiveMessageResult.getMessages().size() + " messages received. Processing.");
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// extract data from the messages into list of bodies to pass into process, and list of delete-batch-inputs //
|
||||
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
List<DeleteMessageBatchRequestEntry> deleteRequestEntries = new ArrayList<>();
|
||||
ArrayList<String> bodies = new ArrayList<>();
|
||||
int i = 0;
|
||||
for(Message message : receiveMessageResult.getMessages())
|
||||
{
|
||||
String body = message.getBody();
|
||||
bodies.add(message.getBody());
|
||||
deleteRequestEntries.add(new DeleteMessageBatchRequestEntry(String.valueOf(i++), message.getReceiptHandle()));
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
// run the process, in a try-catch, so even if it fails, our loop keeps going. //
|
||||
// the messages in a failed process will get re-delivered, to try-again, up to the //
|
||||
// number of times configured in AWS //
|
||||
/////////////////////////////////////////////////////////////////////////////////////
|
||||
try
|
||||
{
|
||||
RunProcessInput runProcessInput = new RunProcessInput(qInstance);
|
||||
runProcessInput.setSession(sessionSupplier.get());
|
||||
runProcessInput.setProcessName(queueMetaData.getProcessName());
|
||||
runProcessInput.setFrontendStepBehavior(RunProcessInput.FrontendStepBehavior.SKIP);
|
||||
runProcessInput.addValue("body", body);
|
||||
runProcessInput.addValue("bodies", bodies);
|
||||
|
||||
RunProcessAction runProcessAction = new RunProcessAction();
|
||||
RunProcessOutput runProcessOutput = runProcessAction.execute(runProcessInput);
|
||||
|
||||
/////////////////////////////////
|
||||
// todo - what of exceptions?? //
|
||||
/////////////////////////////////
|
||||
|
||||
String receiptHandle = message.getReceiptHandle();
|
||||
sqs.deleteMessage(new DeleteMessageRequest(queueUrl, receiptHandle));
|
||||
////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// if there was an exception returned by the process (e.g., thrown in backend step), then //
|
||||
// warn and leave the messages for re-processing. //
|
||||
////////////////////////////////////////////////////////////////////////////////////////////
|
||||
if(runProcessOutput.getException().isPresent())
|
||||
{
|
||||
LOG.warn("Exception returned by process when handling SQS Messages. They will not be deleted from the queue.", runProcessOutput.getException().get());
|
||||
}
|
||||
else
|
||||
{
|
||||
///////////////////////////////////////////////
|
||||
// else, if no exception, do a batch delete. //
|
||||
///////////////////////////////////////////////
|
||||
sqs.deleteMessageBatch(new DeleteMessageBatchRequest()
|
||||
.withQueueUrl(queueUrl)
|
||||
.withEntries(deleteRequestEntries));
|
||||
}
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
LOG.warn("Error receiving SQS Messages.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
LOG.warn("Error receiving SQS Message", e);
|
||||
LOG.warn("Error running SQS Queue Poller", e);
|
||||
}
|
||||
finally
|
||||
{
|
||||
|
Reference in New Issue
Block a user