From 2975b905054733ce46cc90d4878c57ef193cf649 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Wed, 9 Nov 2022 08:43:05 -0600 Subject: [PATCH] Allowing load step to set pipe sizes to avoid 'Giving up adding record to pipe' in easypost tracker creation; Make status never have current > total; --- .../core/actions/async/AsyncJobCallback.java | 21 +++-- .../core/actions/reporting/RecordPipe.java | 22 +++++ .../scripts/StoreAssociatedScriptAction.java | 5 + .../scripts/StoreAssociatedScriptOutput.java | 93 +++++++++++++++++++ .../AbstractLoadStep.java | 20 ++++ .../StreamedETLExecuteStep.java | 26 +++++- .../actions/async/AsyncJobCallbackTest.java | 86 +++++++++++++++++ qqq-dev-tools/bin/xbar-circleci-latest.sh | 21 ++--- 8 files changed, 272 insertions(+), 22 deletions(-) create mode 100644 qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallbackTest.java diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallback.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallback.java index 77217531..4623914e 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallback.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallback.java @@ -78,9 +78,7 @@ public class AsyncJobCallback public void updateStatus(String message, int current, int total) { this.asyncJobStatus.setMessage(message); - this.asyncJobStatus.setCurrent(current); - this.asyncJobStatus.setTotal(total); - storeUpdatedStatus(); + updateStatus(current, total); // this call will storeUpdatedStatus. } @@ -90,7 +88,7 @@ public class AsyncJobCallback *******************************************************************************/ public void updateStatus(int current, int total) { - this.asyncJobStatus.setCurrent(current); + this.asyncJobStatus.setCurrent(current > total ? total : current); this.asyncJobStatus.setTotal(total); storeUpdatedStatus(); } @@ -112,9 +110,20 @@ public class AsyncJobCallback *******************************************************************************/ public void incrementCurrent(int amount) { - if(this.asyncJobStatus.getCurrent() != null) + if(asyncJobStatus.getCurrent() != null) { - this.asyncJobStatus.setCurrent(this.asyncJobStatus.getCurrent() + amount); + if(asyncJobStatus.getTotal() != null && asyncJobStatus.getCurrent() + amount > asyncJobStatus.getTotal()) + { + ///////////////////////////////////////////////////// + // make sure we don't ever make current > total... // + ///////////////////////////////////////////////////// + asyncJobStatus.setCurrent(asyncJobStatus.getTotal()); + } + else + { + asyncJobStatus.setCurrent(asyncJobStatus.getCurrent() + amount); + } + storeUpdatedStatus(); } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java index 218e920a..39e9fa69 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/reporting/RecordPipe.java @@ -57,6 +57,26 @@ public class RecordPipe + /******************************************************************************* + ** Default constructor. + *******************************************************************************/ + public RecordPipe() + { + + } + + + + /******************************************************************************* + ** Construct a record pipe, with an alternative capacity for the internal queue. + *******************************************************************************/ + public RecordPipe(Integer overrideCapacity) + { + queue = new ArrayBlockingQueue<>(overrideCapacity); + } + + + /******************************************************************************* ** Turn off the pipe. Stop accepting new records (just ignore them in the add ** method). Clear the existing queue. Don't return any more records. Note that @@ -109,6 +129,7 @@ public class RecordPipe if(!offerResult && !isTerminated) { + LOG.debug("Pipe is full. Waiting."); long sleepLoopStartTime = System.currentTimeMillis(); long now = System.currentTimeMillis(); while(!offerResult && !isTerminated) @@ -123,6 +144,7 @@ public class RecordPipe offerResult = queue.offer(record); now = System.currentTimeMillis(); } + LOG.debug("Pipe has opened up. Resuming."); } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/scripts/StoreAssociatedScriptAction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/scripts/StoreAssociatedScriptAction.java index c4bfe711..884e2f19 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/scripts/StoreAssociatedScriptAction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/scripts/StoreAssociatedScriptAction.java @@ -218,5 +218,10 @@ public class StoreAssociatedScriptAction updateInput.setTableName("script"); updateInput.setRecords(List.of(script)); new UpdateAction().execute(updateInput); + + output.setScriptId(script.getValueInteger("id")); + output.setScriptName(script.getValueString("name")); + output.setScriptRevisionId(scriptRevision.getValueInteger("id")); + output.setScriptRevisionSequenceNo(scriptRevision.getValueInteger("sequenceNo")); } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/scripts/StoreAssociatedScriptOutput.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/scripts/StoreAssociatedScriptOutput.java index 12549e0d..41d48d7d 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/scripts/StoreAssociatedScriptOutput.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/actions/scripts/StoreAssociatedScriptOutput.java @@ -30,4 +30,97 @@ import com.kingsrook.qqq.backend.core.model.actions.AbstractActionOutput; *******************************************************************************/ public class StoreAssociatedScriptOutput extends AbstractActionOutput { + private Integer scriptId; + private String scriptName; + private Integer scriptRevisionId; + private Integer scriptRevisionSequenceNo; + + + + /******************************************************************************* + ** Getter for scriptId + ** + *******************************************************************************/ + public Integer getScriptId() + { + return scriptId; + } + + + + /******************************************************************************* + ** Setter for scriptId + ** + *******************************************************************************/ + public void setScriptId(Integer scriptId) + { + this.scriptId = scriptId; + } + + + + /******************************************************************************* + ** Getter for scriptName + ** + *******************************************************************************/ + public String getScriptName() + { + return scriptName; + } + + + + /******************************************************************************* + ** Setter for scriptName + ** + *******************************************************************************/ + public void setScriptName(String scriptName) + { + this.scriptName = scriptName; + } + + + + /******************************************************************************* + ** Getter for scriptRevisionId + ** + *******************************************************************************/ + public Integer getScriptRevisionId() + { + return scriptRevisionId; + } + + + + /******************************************************************************* + ** Setter for scriptRevisionId + ** + *******************************************************************************/ + public void setScriptRevisionId(Integer scriptRevisionId) + { + this.scriptRevisionId = scriptRevisionId; + } + + + + /******************************************************************************* + ** Getter for scriptRevisionSequenceNo + ** + *******************************************************************************/ + public Integer getScriptRevisionSequenceNo() + { + return scriptRevisionSequenceNo; + } + + + + /******************************************************************************* + ** Setter for scriptRevisionSequenceNo + ** + *******************************************************************************/ + public void setScriptRevisionSequenceNo(Integer scriptRevisionSequenceNo) + { + this.scriptRevisionSequenceNo = scriptRevisionSequenceNo; + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java index 58dcbc3a..6d47e3dd 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractLoadStep.java @@ -102,4 +102,24 @@ public abstract class AbstractLoadStep implements BackendStep { return (transaction); } + + + + /******************************************************************************* + ** Allow this load step to specify the capacity of the pipe being used by the process. + ** + ** The specific use-case for which this is being added is, in the case of a process + ** with many records being extracted, if the load job is too slow, then the pipe + ** can get filled, and the extractor (who puts records into the pipe) can time out + ** waiting for capacity in the pipe to open up, while a slow loader is consuming + ** the records. + ** + ** In other words, for a slow loader, setting a lower pipe capacity can help prevent + ** time-out errors ("Giving up adding record to pipe...") + *******************************************************************************/ + public Integer getOverrideRecordPipeCapacity() + { + return (null); + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java index fb8eae1a..ba808189 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java @@ -65,14 +65,30 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe /////////////////////////////////////////////////////// // set up the extract, transform, and load functions // /////////////////////////////////////////////////////// - RecordPipe recordPipe = new RecordPipe(); - AbstractExtractStep extractStep = getExtractStep(runBackendStepInput); - extractStep.setRecordPipe(recordPipe); - extractStep.preRun(runBackendStepInput, runBackendStepOutput); - + AbstractExtractStep extractStep = getExtractStep(runBackendStepInput); AbstractTransformStep transformStep = getTransformStep(runBackendStepInput); AbstractLoadStep loadStep = getLoadStep(runBackendStepInput); + ///////////////////////////////////////////////////////////////////////////// + // let the load step override the capacity for the record pipe. // + // this is useful for slower load steps - so that the extract step doesn't // + // fill the pipe, then timeout waiting for all the records to be consumed, // + // before it can put more records in. // + ///////////////////////////////////////////////////////////////////////////// + RecordPipe recordPipe; + if(loadStep.getOverrideRecordPipeCapacity() != null) + { + recordPipe = new RecordPipe(loadStep.getOverrideRecordPipeCapacity()); + LOG.debug("Overriding record pipe capacity to: " + loadStep.getOverrideRecordPipeCapacity()); + } + else + { + recordPipe = new RecordPipe(); + } + + extractStep.setRecordPipe(recordPipe); + extractStep.preRun(runBackendStepInput, runBackendStepOutput); + transformStep.preRun(runBackendStepInput, runBackendStepOutput); loadStep.preRun(runBackendStepInput, runBackendStepOutput); diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallbackTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallbackTest.java new file mode 100644 index 00000000..062cbe70 --- /dev/null +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobCallbackTest.java @@ -0,0 +1,86 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2022. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.actions.async; + + +import java.util.UUID; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +/******************************************************************************* + ** Unit test for AsyncJobCallback + *******************************************************************************/ +class AsyncJobCallbackTest +{ + + /******************************************************************************* + ** + *******************************************************************************/ + @Test + void test() + { + AsyncJobStatus asyncJobStatus = new AsyncJobStatus(); + AsyncJobCallback asyncJobCallback = new AsyncJobCallback(UUID.randomUUID(), asyncJobStatus); + + ///////////////////////////////////////////////////// + // make sure current never goes greater than total // + ///////////////////////////////////////////////////// + asyncJobCallback.updateStatus(3, 2); + assertEquals(2, asyncJobStatus.getTotal()); + assertEquals(2, asyncJobStatus.getCurrent()); + + asyncJobCallback.updateStatus("With Message", 3, 2); + assertEquals(2, asyncJobStatus.getTotal()); + assertEquals(2, asyncJobStatus.getCurrent()); + + ////////////////////////// + // reset, then count up // + ////////////////////////// + asyncJobCallback.updateStatus(1, 3); + assertEquals(3, asyncJobStatus.getTotal()); + assertEquals(1, asyncJobStatus.getCurrent()); + + asyncJobCallback.incrementCurrent(); + assertEquals(2, asyncJobStatus.getCurrent()); + + asyncJobCallback.incrementCurrent(); + assertEquals(3, asyncJobStatus.getCurrent()); + + ///////////////////////////////// + // try to go to 4 - stay at 3. // + ///////////////////////////////// + asyncJobCallback.incrementCurrent(); + assertEquals(3, asyncJobStatus.getCurrent()); + + /////////// + // reset // + /////////// + asyncJobCallback.updateStatus(1, 3); + assertEquals(3, asyncJobStatus.getTotal()); + assertEquals(1, asyncJobStatus.getCurrent()); + + asyncJobCallback.incrementCurrent(4); + assertEquals(3, asyncJobStatus.getCurrent()); + } + +} \ No newline at end of file diff --git a/qqq-dev-tools/bin/xbar-circleci-latest.sh b/qqq-dev-tools/bin/xbar-circleci-latest.sh index 096e28c2..899aeba1 100755 --- a/qqq-dev-tools/bin/xbar-circleci-latest.sh +++ b/qqq-dev-tools/bin/xbar-circleci-latest.sh @@ -24,14 +24,17 @@ checkBuild() index=$1 repo=$($JQ ".[$i].reponame" < $FILE | sed 's/"//g') + branch=$($JQ ".[$i].branch" < $FILE | sed 's/"//g;s/null//;') + tag=$($JQ ".[$i].vcs_tag" < $FILE | sed 's/"//g;s/null//;') buildStatus=$($JQ ".[$i].status" < $FILE | sed 's/"//g') url=$($JQ ".[$i].build_url" < $FILE | sed 's/"//g') jobName=$($JQ ".[$i].workflows.job_name" < $FILE | sed 's/"//g') avatarUrl=$($JQ ".[$i].user.avatar_url" < $FILE | sed 's/"//g') - date=$($JQ ".[$i].queued_at" < $FILE | sed 's/"//g') - if [ "$date" == "null" ]; then - date=$($JQ ".[$i].committer_date" < $FILE | sed 's/"//g') + startDate=$($JQ ".[$i].queued_at" < $FILE | sed 's/"//g') + if [ "$startDate" == "null" ]; then + startDate=$($JQ ".[$i].committer_date" < $FILE | sed 's/"//g') fi + endDate=$($JQ ".[$i].stop_time" < $FILE | sed 's/"//g;s/null//;') curl $avatarUrl > /tmp/avatar.jpg sips -s dpiHeight 96 -s dpiWidth 96 /tmp/avatar.jpg -o /tmp/avatar-96dpi.jpg > /dev/null @@ -41,18 +44,14 @@ checkBuild() shortRepo="$repo" case $repo in - qqq-backend-core) shortRepo="core";; - qqq-backend-module-filesystem) shortRepo="fs";; - qqq-backend-module-rdbms) shortRepo="db";; - qqq-middleware-javalin) shortRepo="j'lin";; - qqq-middleware-picocli) shortRepo="p'cli";; - qqq-sample-project) shortRepo="samp";; + qqq) shortRepo="qqq";; qqq-frontend-core) shortRepo="f'core";; qqq-frontend-material-dashboard) shortRepo="m-db";; Nutrifresh-One) shortRepo="nf1";; + Nutrifresh-One-Scripts) shortRepo="nf1-scr";; esac - timestamp=$(date -j -f "%Y-%m-%dT%H:%M:%S%z" $(echo "$date" | sed 's/\....Z/+0000/') +%s) + timestamp=$(date -j -f "%Y-%m-%dT%H:%M:%S%z" $(echo "$startDate" | sed 's/\....Z/+0000/') +%s) seconds=$(( $NOW - $timestamp )) if [ $seconds -lt 120 ]; then age="$seconds seconds" @@ -90,7 +89,7 @@ checkBuild() if [ $index -lt 1 -o $seconds -lt 600 ]; then echo -n "${shortRepo}(${shortAge})${icon} " fi - details="$details\n$repo: $jobName: $buildStatus @ $age ago | color=$color | href=$url | image=$avatarB64" + details="$details\n$repo/${branch}${tag}: $jobName: $buildStatus @ $age ago | color=$color | href=$url | image=$avatarB64" } details="---"