Allowing load step to set pipe sizes to avoid 'Giving up adding record to pipe' in easypost tracker creation; Make status never have current > total;

This commit is contained in:
2022-11-09 08:43:05 -06:00
parent 1e09931218
commit 2975b90505
8 changed files with 272 additions and 22 deletions

View File

@ -78,9 +78,7 @@ public class AsyncJobCallback
public void updateStatus(String message, int current, int total)
{
this.asyncJobStatus.setMessage(message);
this.asyncJobStatus.setCurrent(current);
this.asyncJobStatus.setTotal(total);
storeUpdatedStatus();
updateStatus(current, total); // this call will storeUpdatedStatus.
}
@ -90,7 +88,7 @@ public class AsyncJobCallback
*******************************************************************************/
public void updateStatus(int current, int total)
{
this.asyncJobStatus.setCurrent(current);
this.asyncJobStatus.setCurrent(current > total ? total : current);
this.asyncJobStatus.setTotal(total);
storeUpdatedStatus();
}
@ -112,9 +110,20 @@ public class AsyncJobCallback
*******************************************************************************/
public void incrementCurrent(int amount)
{
if(this.asyncJobStatus.getCurrent() != null)
if(asyncJobStatus.getCurrent() != null)
{
this.asyncJobStatus.setCurrent(this.asyncJobStatus.getCurrent() + amount);
if(asyncJobStatus.getTotal() != null && asyncJobStatus.getCurrent() + amount > asyncJobStatus.getTotal())
{
/////////////////////////////////////////////////////
// make sure we don't ever make current > total... //
/////////////////////////////////////////////////////
asyncJobStatus.setCurrent(asyncJobStatus.getTotal());
}
else
{
asyncJobStatus.setCurrent(asyncJobStatus.getCurrent() + amount);
}
storeUpdatedStatus();
}
}

View File

@ -57,6 +57,26 @@ public class RecordPipe
/*******************************************************************************
** Default constructor.
*******************************************************************************/
public RecordPipe()
{
}
/*******************************************************************************
** Construct a record pipe, with an alternative capacity for the internal queue.
*******************************************************************************/
public RecordPipe(Integer overrideCapacity)
{
queue = new ArrayBlockingQueue<>(overrideCapacity);
}
/*******************************************************************************
** Turn off the pipe. Stop accepting new records (just ignore them in the add
** method). Clear the existing queue. Don't return any more records. Note that
@ -109,6 +129,7 @@ public class RecordPipe
if(!offerResult && !isTerminated)
{
LOG.debug("Pipe is full. Waiting.");
long sleepLoopStartTime = System.currentTimeMillis();
long now = System.currentTimeMillis();
while(!offerResult && !isTerminated)
@ -123,6 +144,7 @@ public class RecordPipe
offerResult = queue.offer(record);
now = System.currentTimeMillis();
}
LOG.debug("Pipe has opened up. Resuming.");
}
}

View File

@ -218,5 +218,10 @@ public class StoreAssociatedScriptAction
updateInput.setTableName("script");
updateInput.setRecords(List.of(script));
new UpdateAction().execute(updateInput);
output.setScriptId(script.getValueInteger("id"));
output.setScriptName(script.getValueString("name"));
output.setScriptRevisionId(scriptRevision.getValueInteger("id"));
output.setScriptRevisionSequenceNo(scriptRevision.getValueInteger("sequenceNo"));
}
}

View File

@ -30,4 +30,97 @@ import com.kingsrook.qqq.backend.core.model.actions.AbstractActionOutput;
*******************************************************************************/
public class StoreAssociatedScriptOutput extends AbstractActionOutput
{
private Integer scriptId;
private String scriptName;
private Integer scriptRevisionId;
private Integer scriptRevisionSequenceNo;
/*******************************************************************************
** Getter for scriptId
**
*******************************************************************************/
public Integer getScriptId()
{
return scriptId;
}
/*******************************************************************************
** Setter for scriptId
**
*******************************************************************************/
public void setScriptId(Integer scriptId)
{
this.scriptId = scriptId;
}
/*******************************************************************************
** Getter for scriptName
**
*******************************************************************************/
public String getScriptName()
{
return scriptName;
}
/*******************************************************************************
** Setter for scriptName
**
*******************************************************************************/
public void setScriptName(String scriptName)
{
this.scriptName = scriptName;
}
/*******************************************************************************
** Getter for scriptRevisionId
**
*******************************************************************************/
public Integer getScriptRevisionId()
{
return scriptRevisionId;
}
/*******************************************************************************
** Setter for scriptRevisionId
**
*******************************************************************************/
public void setScriptRevisionId(Integer scriptRevisionId)
{
this.scriptRevisionId = scriptRevisionId;
}
/*******************************************************************************
** Getter for scriptRevisionSequenceNo
**
*******************************************************************************/
public Integer getScriptRevisionSequenceNo()
{
return scriptRevisionSequenceNo;
}
/*******************************************************************************
** Setter for scriptRevisionSequenceNo
**
*******************************************************************************/
public void setScriptRevisionSequenceNo(Integer scriptRevisionSequenceNo)
{
this.scriptRevisionSequenceNo = scriptRevisionSequenceNo;
}
}

View File

@ -102,4 +102,24 @@ public abstract class AbstractLoadStep implements BackendStep
{
return (transaction);
}
/*******************************************************************************
** Allow this load step to specify the capacity of the pipe being used by the process.
**
** The specific use-case for which this is being added is, in the case of a process
** with many records being extracted, if the load job is too slow, then the pipe
** can get filled, and the extractor (who puts records into the pipe) can time out
** waiting for capacity in the pipe to open up, while a slow loader is consuming
** the records.
**
** In other words, for a slow loader, setting a lower pipe capacity can help prevent
** time-out errors ("Giving up adding record to pipe...")
*******************************************************************************/
public Integer getOverrideRecordPipeCapacity()
{
return (null);
}
}

View File

@ -65,14 +65,30 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
///////////////////////////////////////////////////////
// set up the extract, transform, and load functions //
///////////////////////////////////////////////////////
RecordPipe recordPipe = new RecordPipe();
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
extractStep.setRecordPipe(recordPipe);
extractStep.preRun(runBackendStepInput, runBackendStepOutput);
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
AbstractLoadStep loadStep = getLoadStep(runBackendStepInput);
/////////////////////////////////////////////////////////////////////////////
// let the load step override the capacity for the record pipe. //
// this is useful for slower load steps - so that the extract step doesn't //
// fill the pipe, then timeout waiting for all the records to be consumed, //
// before it can put more records in. //
/////////////////////////////////////////////////////////////////////////////
RecordPipe recordPipe;
if(loadStep.getOverrideRecordPipeCapacity() != null)
{
recordPipe = new RecordPipe(loadStep.getOverrideRecordPipeCapacity());
LOG.debug("Overriding record pipe capacity to: " + loadStep.getOverrideRecordPipeCapacity());
}
else
{
recordPipe = new RecordPipe();
}
extractStep.setRecordPipe(recordPipe);
extractStep.preRun(runBackendStepInput, runBackendStepOutput);
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
loadStep.preRun(runBackendStepInput, runBackendStepOutput);

View File

@ -0,0 +1,86 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2022. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.actions.async;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
/*******************************************************************************
** Unit test for AsyncJobCallback
*******************************************************************************/
class AsyncJobCallbackTest
{
/*******************************************************************************
**
*******************************************************************************/
@Test
void test()
{
AsyncJobStatus asyncJobStatus = new AsyncJobStatus();
AsyncJobCallback asyncJobCallback = new AsyncJobCallback(UUID.randomUUID(), asyncJobStatus);
/////////////////////////////////////////////////////
// make sure current never goes greater than total //
/////////////////////////////////////////////////////
asyncJobCallback.updateStatus(3, 2);
assertEquals(2, asyncJobStatus.getTotal());
assertEquals(2, asyncJobStatus.getCurrent());
asyncJobCallback.updateStatus("With Message", 3, 2);
assertEquals(2, asyncJobStatus.getTotal());
assertEquals(2, asyncJobStatus.getCurrent());
//////////////////////////
// reset, then count up //
//////////////////////////
asyncJobCallback.updateStatus(1, 3);
assertEquals(3, asyncJobStatus.getTotal());
assertEquals(1, asyncJobStatus.getCurrent());
asyncJobCallback.incrementCurrent();
assertEquals(2, asyncJobStatus.getCurrent());
asyncJobCallback.incrementCurrent();
assertEquals(3, asyncJobStatus.getCurrent());
/////////////////////////////////
// try to go to 4 - stay at 3. //
/////////////////////////////////
asyncJobCallback.incrementCurrent();
assertEquals(3, asyncJobStatus.getCurrent());
///////////
// reset //
///////////
asyncJobCallback.updateStatus(1, 3);
assertEquals(3, asyncJobStatus.getTotal());
assertEquals(1, asyncJobStatus.getCurrent());
asyncJobCallback.incrementCurrent(4);
assertEquals(3, asyncJobStatus.getCurrent());
}
}