diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java index 3817ad2d..03d10708 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java @@ -45,9 +45,10 @@ public class AsyncRecordPipeLoop private static final int TIMEOUT_AFTER_NO_RECORDS_MS = 10 * 60 * 1000; - private static final int MAX_SLEEP_MS = 1000; - private static final int INIT_SLEEP_MS = 10; - private static final int MIN_RECORDS_TO_CONSUME = 10; + private static final int MAX_SLEEP_MS = 1000; + private static final int INIT_SLEEP_MS = 10; + + private Integer minRecordsToConsume = 10; @@ -83,7 +84,7 @@ public class AsyncRecordPipeLoop while(jobState.equals(AsyncJobState.RUNNING)) { - if(recordPipe.countAvailableRecords() < MIN_RECORDS_TO_CONSUME) + if(recordPipe.countAvailableRecords() < minRecordsToConsume) { /////////////////////////////////////////////////////////////// // if the pipe is too empty, sleep to let the producer work. // @@ -176,4 +177,35 @@ public class AsyncRecordPipeLoop return (recordCount); } + + + /******************************************************************************* + ** Getter for minRecordsToConsume + *******************************************************************************/ + public Integer getMinRecordsToConsume() + { + return (this.minRecordsToConsume); + } + + + + /******************************************************************************* + ** Setter for minRecordsToConsume + *******************************************************************************/ + public void setMinRecordsToConsume(Integer minRecordsToConsume) + { + this.minRecordsToConsume = minRecordsToConsume; + } + + + + /******************************************************************************* + ** Fluent setter for minRecordsToConsume + *******************************************************************************/ + public AsyncRecordPipeLoop withMinRecordsToConsume(Integer minRecordsToConsume) + { + this.minRecordsToConsume = minRecordsToConsume; + return (this); + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java index 86225eb2..fa7b086a 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java @@ -117,7 +117,7 @@ public abstract class AbstractLoadStep implements BackendStep ** In other words, for a slow loader, setting a lower pipe capacity can help prevent ** time-out errors ("Giving up adding record to pipe...") *******************************************************************************/ - public Integer getOverrideRecordPipeCapacity() + public Integer getOverrideRecordPipeCapacity(RunBackendStepInput runBackendStepInput) { return (null); } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java index 6a353e50..187ae972 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java @@ -106,7 +106,7 @@ public abstract class AbstractTransformStep implements BackendStep, ProcessSumma ** In other words, for a slow transformer, setting a lower pipe capacity can help prevent ** time-out errors ("Giving up adding record to pipe...") *******************************************************************************/ - public Integer getOverrideRecordPipeCapacity() + public Integer getOverrideRecordPipeCapacity(RunBackendStepInput runBackendStepInput) { return (null); } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java index 192c1b4d..2b61ead3 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java @@ -80,19 +80,24 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe // before it can put more records in. // ///////////////////////////////////////////////////////////////////////////// RecordPipe recordPipe; - if(loadStep.getOverrideRecordPipeCapacity() != null) + Integer overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput); + if(overrideRecordPipeCapacity != null) { - recordPipe = new RecordPipe(loadStep.getOverrideRecordPipeCapacity()); - LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + loadStep.getOverrideRecordPipeCapacity()); - } - else if(transformStep.getOverrideRecordPipeCapacity() != null) - { - recordPipe = new RecordPipe(transformStep.getOverrideRecordPipeCapacity()); - LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + transformStep.getOverrideRecordPipeCapacity()); + recordPipe = new RecordPipe(overrideRecordPipeCapacity); + LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); } else { - recordPipe = new RecordPipe(); + overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput); + if(overrideRecordPipeCapacity != null) + { + recordPipe = new RecordPipe(overrideRecordPipeCapacity); + LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity); + } + else + { + recordPipe = new RecordPipe(); + } } extractStep.setRecordPipe(recordPipe); @@ -112,8 +117,14 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe transformStep.setTransaction(transaction); } - List loadedRecordList = new ArrayList<>(); - int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) -> + List loadedRecordList = new ArrayList<>(); + AsyncRecordPipeLoop asyncRecordPipeLoop = new AsyncRecordPipeLoop(); + if(overrideRecordPipeCapacity != null && overrideRecordPipeCapacity < asyncRecordPipeLoop.getMinRecordsToConsume()) + { + asyncRecordPipeLoop.setMinRecordsToConsume(overrideRecordPipeCapacity); + } + + int recordCount = asyncRecordPipeLoop.run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) -> { extractStep.run(runBackendStepInput, runBackendStepOutput); return (runBackendStepOutput);