diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePoller.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePoller.java index ea42058b..ea23172c 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePoller.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePoller.java @@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.actions.queues; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.function.Supplier; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; @@ -41,6 +42,8 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessInput; import com.kingsrook.qqq.backend.core.model.actions.processes.RunProcessOutput; import com.kingsrook.qqq.backend.core.model.metadata.QInstance; import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSPollerSettings; +import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueMetaData; import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData; import com.kingsrook.qqq.backend.core.model.session.QSession; @@ -90,15 +93,17 @@ public class SQSQueuePoller implements Runnable } queueUrl += queueMetaData.getQueueName(); - while(true) + SQSPollerSettings sqsPollerSettings = getSqsPollerSettings(queueProviderMetaData, queueMetaData); + + for(int loop = 0; loop < sqsPollerSettings.getMaxLoops(); loop++) { /////////////////////////////// // fetch a batch of messages // /////////////////////////////// ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(); receiveMessageRequest.setQueueUrl(queueUrl); - receiveMessageRequest.setMaxNumberOfMessages(10); - receiveMessageRequest.setWaitTimeSeconds(20); // help urge SQS to query multiple servers and find more messages + receiveMessageRequest.setMaxNumberOfMessages(sqsPollerSettings.getMaxNumberOfMessages()); + receiveMessageRequest.setWaitTimeSeconds(sqsPollerSettings.getWaitTimeSeconds()); // larger value (e.g., 20) can help urge SQS to query multiple servers and find more messages ReceiveMessageResult receiveMessageResult = sqs.receiveMessage(receiveMessageRequest); if(receiveMessageResult.getMessages().isEmpty()) { @@ -177,6 +182,47 @@ public class SQSQueuePoller implements Runnable + /******************************************************************************* + ** For a given queueProvider and queue, get the poller settings to use (using + ** default values if none are set at either level). + *******************************************************************************/ + static SQSPollerSettings getSqsPollerSettings(SQSQueueProviderMetaData queueProviderMetaData, QQueueMetaData queueMetaData) + { + ///////////////////////////////// + // start with default settings // + ///////////////////////////////// + SQSPollerSettings sqsPollerSettings = new SQSPollerSettings() + .withMaxLoops(Integer.MAX_VALUE) + .withMaxNumberOfMessages(10) + .withWaitTimeSeconds(20); + + ///////////////////////////////////////////////////////////////////// + // if the queue provider has settings, let them overwrite defaults // + ///////////////////////////////////////////////////////////////////// + if(queueProviderMetaData != null && queueProviderMetaData.getPollerSettings() != null) + { + SQSPollerSettings providerSettings = queueProviderMetaData.getPollerSettings(); + sqsPollerSettings.setMaxLoops(Objects.requireNonNullElse(providerSettings.getMaxLoops(), sqsPollerSettings.getMaxLoops())); + sqsPollerSettings.setMaxNumberOfMessages(Objects.requireNonNullElse(providerSettings.getMaxNumberOfMessages(), sqsPollerSettings.getMaxNumberOfMessages())); + sqsPollerSettings.setWaitTimeSeconds(Objects.requireNonNullElse(providerSettings.getWaitTimeSeconds(), sqsPollerSettings.getWaitTimeSeconds())); + } + + //////////////////////////////////////////////////////////// + // if the queue has settings, let them overwrite defaults // + //////////////////////////////////////////////////////////// + if(queueMetaData instanceof SQSQueueMetaData sqsQueueMetaData && sqsQueueMetaData.getPollerSettings() != null) + { + SQSPollerSettings providerSettings = sqsQueueMetaData.getPollerSettings(); + sqsPollerSettings.setMaxLoops(Objects.requireNonNullElse(providerSettings.getMaxLoops(), sqsPollerSettings.getMaxLoops())); + sqsPollerSettings.setMaxNumberOfMessages(Objects.requireNonNullElse(providerSettings.getMaxNumberOfMessages(), sqsPollerSettings.getMaxNumberOfMessages())); + sqsPollerSettings.setWaitTimeSeconds(Objects.requireNonNullElse(providerSettings.getWaitTimeSeconds(), sqsPollerSettings.getWaitTimeSeconds())); + } + + return sqsPollerSettings; + } + + + /******************************************************************************* ** Setter for queueProviderMetaData ** diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/instances/QInstanceValidator.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/instances/QInstanceValidator.java index 9ece09aa..e8b4812a 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/instances/QInstanceValidator.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/instances/QInstanceValidator.java @@ -79,6 +79,8 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QStepMetaData; import com.kingsrook.qqq.backend.core.model.metadata.processes.QSupplementalProcessMetaData; import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData; import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueProviderMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.queues.QueueType; +import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueMetaData; import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData; import com.kingsrook.qqq.backend.core.model.metadata.reporting.QReportDataSource; import com.kingsrook.qqq.backend.core.model.metadata.reporting.QReportField; @@ -439,11 +441,30 @@ public class QInstanceValidator if(queueProvider instanceof SQSQueueProviderMetaData sqsQueueProvider) { + if(queueProvider.getType() != null) + { + assertCondition(queueProvider.getType().equals(QueueType.SQS), "Inconsistent Type/class given for queueProvider: " + name + " (SQSQueueProviderMetaData is not allowed for type " + queueProvider.getType() + ")"); + } + assertCondition(StringUtils.hasContent(sqsQueueProvider.getAccessKey()), "Missing accessKey for SQSQueueProvider: " + name); assertCondition(StringUtils.hasContent(sqsQueueProvider.getSecretKey()), "Missing secretKey for SQSQueueProvider: " + name); assertCondition(StringUtils.hasContent(sqsQueueProvider.getBaseURL()), "Missing baseURL for SQSQueueProvider: " + name); assertCondition(StringUtils.hasContent(sqsQueueProvider.getRegion()), "Missing region for SQSQueueProvider: " + name); } + else if(queueProvider.getClass().equals(QQueueProviderMetaData.class)) + { + ///////////////////////////////////////////////////////////////////// + // this just means a subtype wasn't used, so, it should be allowed // + // (unless we had a case where a type required a subtype?) // + ///////////////////////////////////////////////////////////////////// + } + else + { + if(queueProvider.getType() != null) + { + assertCondition(!queueProvider.getType().equals(QueueType.SQS), "Inconsistent Type/class given for queueProvider: " + name + " (" + queueProvider.getClass().getSimpleName() + " is not allowed for type " + queueProvider.getType() + ")"); + } + } runPlugins(QQueueProviderMetaData.class, queueProvider, qInstance); }); @@ -454,7 +475,27 @@ public class QInstanceValidator qInstance.getQueues().forEach((name, queue) -> { assertCondition(Objects.equals(name, queue.getName()), "Inconsistent naming for queue: " + name + "/" + queue.getName() + "."); - assertCondition(qInstance.getQueueProvider(queue.getProviderName()) != null, "Unrecognized queue providerName for queue: " + name); + + QQueueProviderMetaData queueProvider = qInstance.getQueueProvider(queue.getProviderName()); + if(assertCondition(queueProvider != null, "Unrecognized queue providerName for queue: " + name)) + { + if(queue instanceof SQSQueueMetaData) + { + assertCondition(queueProvider.getType().equals(QueueType.SQS), "Inconsistent class given for queueMetaData: " + name + " (SQSQueueMetaData is not allowed for queue provider of type " + queueProvider.getType() + ")"); + } + else if(queue.getClass().equals(QQueueMetaData.class)) + { + //////////////////////////////////////////////////////////////////// + // this just means a subtype wasn't used, so, it should be // + // allowed (unless we had a case where a type required a subtype? // + //////////////////////////////////////////////////////////////////// + } + else + { + assertCondition(!queueProvider.getType().equals(QueueType.SQS), "Inconsistent class given for queueProvider: " + name + " (" + queue.getClass().getSimpleName() + " is not allowed for type " + queueProvider.getType() + ")"); + } + } + assertCondition(StringUtils.hasContent(queue.getQueueName()), "Missing queueName for queue: " + name); if(assertCondition(StringUtils.hasContent(queue.getProcessName()), "Missing processName for queue: " + name)) { diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSPollerSettings.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSPollerSettings.java new file mode 100644 index 00000000..f3c35d1d --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSPollerSettings.java @@ -0,0 +1,128 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.model.metadata.queues; + + +/******************************************************************************* + ** settings that can be applied to either an SQSQueue or an SQSQueueProvider, + ** to control what the SQSQueuePoller does when it receives from AWS. + *******************************************************************************/ +public class SQSPollerSettings +{ + private Integer maxNumberOfMessages; + private Integer waitTimeSeconds; + private Integer maxLoops; + + + + /******************************************************************************* + ** Getter for maxNumberOfMessages + *******************************************************************************/ + public Integer getMaxNumberOfMessages() + { + return (this.maxNumberOfMessages); + } + + + + /******************************************************************************* + ** Setter for maxNumberOfMessages + *******************************************************************************/ + public void setMaxNumberOfMessages(Integer maxNumberOfMessages) + { + this.maxNumberOfMessages = maxNumberOfMessages; + } + + + + /******************************************************************************* + ** Fluent setter for maxNumberOfMessages + *******************************************************************************/ + public SQSPollerSettings withMaxNumberOfMessages(Integer maxNumberOfMessages) + { + this.maxNumberOfMessages = maxNumberOfMessages; + return (this); + } + + + + /******************************************************************************* + ** Getter for waitTimeSeconds + *******************************************************************************/ + public Integer getWaitTimeSeconds() + { + return (this.waitTimeSeconds); + } + + + + /******************************************************************************* + ** Setter for waitTimeSeconds + *******************************************************************************/ + public void setWaitTimeSeconds(Integer waitTimeSeconds) + { + this.waitTimeSeconds = waitTimeSeconds; + } + + + + /******************************************************************************* + ** Fluent setter for waitTimeSeconds + *******************************************************************************/ + public SQSPollerSettings withWaitTimeSeconds(Integer waitTimeSeconds) + { + this.waitTimeSeconds = waitTimeSeconds; + return (this); + } + + + + /******************************************************************************* + ** Getter for maxLoops + *******************************************************************************/ + public Integer getMaxLoops() + { + return (this.maxLoops); + } + + + + /******************************************************************************* + ** Setter for maxLoops + *******************************************************************************/ + public void setMaxLoops(Integer maxLoops) + { + this.maxLoops = maxLoops; + } + + + + /******************************************************************************* + ** Fluent setter for maxLoops + *******************************************************************************/ + public SQSPollerSettings withMaxLoops(Integer maxLoops) + { + this.maxLoops = maxLoops; + return (this); + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSQueueMetaData.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSQueueMetaData.java new file mode 100644 index 00000000..aab79896 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSQueueMetaData.java @@ -0,0 +1,63 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.model.metadata.queues; + + +/******************************************************************************* + ** SQS subclass of meta-data for a specific Queue + *******************************************************************************/ +public class SQSQueueMetaData extends QQueueMetaData +{ + private SQSPollerSettings pollerSettings; + + + + /******************************************************************************* + ** Getter for pollerSettings + *******************************************************************************/ + public SQSPollerSettings getPollerSettings() + { + return (this.pollerSettings); + } + + + + /******************************************************************************* + ** Setter for pollerSettings + *******************************************************************************/ + public void setPollerSettings(SQSPollerSettings pollerSettings) + { + this.pollerSettings = pollerSettings; + } + + + + /******************************************************************************* + ** Fluent setter for pollerSettings + *******************************************************************************/ + public SQSQueueMetaData withPollerSettings(SQSPollerSettings pollerSettings) + { + this.pollerSettings = pollerSettings; + return (this); + } + +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSQueueProviderMetaData.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSQueueProviderMetaData.java index 6184db3f..9afb5e07 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSQueueProviderMetaData.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/metadata/queues/SQSQueueProviderMetaData.java @@ -36,6 +36,8 @@ public class SQSQueueProviderMetaData extends QQueueProviderMetaData private String region; private String baseURL; + private SQSPollerSettings pollerSettings; + /******************************************************************************* @@ -196,4 +198,35 @@ public class SQSQueueProviderMetaData extends QQueueProviderMetaData return (this); } + + + /******************************************************************************* + ** Getter for pollerSettings + *******************************************************************************/ + public SQSPollerSettings getPollerSettings() + { + return (this.pollerSettings); + } + + + + /******************************************************************************* + ** Setter for pollerSettings + *******************************************************************************/ + public void setPollerSettings(SQSPollerSettings pollerSettings) + { + this.pollerSettings = pollerSettings; + } + + + + /******************************************************************************* + ** Fluent setter for pollerSettings + *******************************************************************************/ + public SQSQueueProviderMetaData withPollerSettings(SQSPollerSettings pollerSettings) + { + this.pollerSettings = pollerSettings; + return (this); + } + } diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePollerTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePollerTest.java new file mode 100644 index 00000000..d9690bb6 --- /dev/null +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/queues/SQSQueuePollerTest.java @@ -0,0 +1,87 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2024. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.actions.queues; + + +import com.kingsrook.qqq.backend.core.BaseTest; +import com.kingsrook.qqq.backend.core.model.metadata.queues.QQueueMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSPollerSettings; +import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueMetaData; +import com.kingsrook.qqq.backend.core.model.metadata.queues.SQSQueueProviderMetaData; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +/******************************************************************************* + ** Unit test for SQSQueuePoller + *******************************************************************************/ +class SQSQueuePollerTest extends BaseTest +{ + + /******************************************************************************* + ** + *******************************************************************************/ + @Test + void testGetSqsPollerSettings() + { + /////////////////// + // defaults only // + /////////////////// + assertSettings(Integer.MAX_VALUE, 10, 20, SQSQueuePoller.getSqsPollerSettings(null, null)); + assertSettings(Integer.MAX_VALUE, 10, 20, SQSQueuePoller.getSqsPollerSettings(new SQSQueueProviderMetaData(), new SQSQueueMetaData())); + assertSettings(Integer.MAX_VALUE, 10, 20, SQSQueuePoller.getSqsPollerSettings(new SQSQueueProviderMetaData().withPollerSettings(new SQSPollerSettings()), new SQSQueueMetaData().withPollerSettings(new SQSPollerSettings()))); + + /////////////////////////////////// + // settings only in the provider // + /////////////////////////////////// + assertSettings(100, 5, 1, SQSQueuePoller.getSqsPollerSettings( + new SQSQueueProviderMetaData().withPollerSettings(new SQSPollerSettings().withMaxLoops(100).withMaxNumberOfMessages(5).withWaitTimeSeconds(1)), + new QQueueMetaData())); + + //////////////////////////////// + // settings only in the queue // + //////////////////////////////// + assertSettings(90, 4, 2, SQSQueuePoller.getSqsPollerSettings( + new SQSQueueProviderMetaData(), + new SQSQueueMetaData().withPollerSettings(new SQSPollerSettings().withMaxLoops(90).withMaxNumberOfMessages(4).withWaitTimeSeconds(2)))); + + ///////////////////////////////////////// + // mix of default, provider, and queue // + ///////////////////////////////////////// + assertSettings(Integer.MAX_VALUE, 5, 2, SQSQueuePoller.getSqsPollerSettings( + new SQSQueueProviderMetaData().withPollerSettings(new SQSPollerSettings().withMaxNumberOfMessages(5)), + new SQSQueueMetaData().withPollerSettings(new SQSPollerSettings().withWaitTimeSeconds(2)))); + } + + + + /******************************************************************************* + ** + *******************************************************************************/ + private void assertSettings(Integer maxLoops, Integer maxNumberOfMessages, Integer waitTimeSeconds, SQSPollerSettings sqsPollerSettings) + { + assertEquals(maxLoops, sqsPollerSettings.getMaxLoops()); + assertEquals(maxNumberOfMessages, sqsPollerSettings.getMaxNumberOfMessages()); + assertEquals(waitTimeSeconds, sqsPollerSettings.getWaitTimeSeconds()); + } + +} \ No newline at end of file