Make pipeLoop minRecords a parameter; add input to getOverrideRecordPipeCapacity

This commit is contained in:
2023-03-15 17:00:09 -05:00
parent b6e089a364
commit 0e01372200
4 changed files with 60 additions and 17 deletions

View File

@ -45,9 +45,10 @@ public class AsyncRecordPipeLoop
private static final int TIMEOUT_AFTER_NO_RECORDS_MS = 10 * 60 * 1000;
private static final int MAX_SLEEP_MS = 1000;
private static final int INIT_SLEEP_MS = 10;
private static final int MIN_RECORDS_TO_CONSUME = 10;
private static final int MAX_SLEEP_MS = 1000;
private static final int INIT_SLEEP_MS = 10;
private Integer minRecordsToConsume = 10;
@ -83,7 +84,7 @@ public class AsyncRecordPipeLoop
while(jobState.equals(AsyncJobState.RUNNING))
{
if(recordPipe.countAvailableRecords() < MIN_RECORDS_TO_CONSUME)
if(recordPipe.countAvailableRecords() < minRecordsToConsume)
{
///////////////////////////////////////////////////////////////
// if the pipe is too empty, sleep to let the producer work. //
@ -176,4 +177,35 @@ public class AsyncRecordPipeLoop
return (recordCount);
}
/*******************************************************************************
** Getter for minRecordsToConsume
*******************************************************************************/
public Integer getMinRecordsToConsume()
{
return (this.minRecordsToConsume);
}
/*******************************************************************************
** Setter for minRecordsToConsume
*******************************************************************************/
public void setMinRecordsToConsume(Integer minRecordsToConsume)
{
this.minRecordsToConsume = minRecordsToConsume;
}
/*******************************************************************************
** Fluent setter for minRecordsToConsume
*******************************************************************************/
public AsyncRecordPipeLoop withMinRecordsToConsume(Integer minRecordsToConsume)
{
this.minRecordsToConsume = minRecordsToConsume;
return (this);
}
}

View File

@ -117,7 +117,7 @@ public abstract class AbstractLoadStep implements BackendStep
** In other words, for a slow loader, setting a lower pipe capacity can help prevent
** time-out errors ("Giving up adding record to pipe...")
*******************************************************************************/
public Integer getOverrideRecordPipeCapacity()
public Integer getOverrideRecordPipeCapacity(RunBackendStepInput runBackendStepInput)
{
return (null);
}

View File

@ -106,7 +106,7 @@ public abstract class AbstractTransformStep implements BackendStep, ProcessSumma
** In other words, for a slow transformer, setting a lower pipe capacity can help prevent
** time-out errors ("Giving up adding record to pipe...")
*******************************************************************************/
public Integer getOverrideRecordPipeCapacity()
public Integer getOverrideRecordPipeCapacity(RunBackendStepInput runBackendStepInput)
{
return (null);
}

View File

@ -80,19 +80,24 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
// before it can put more records in. //
/////////////////////////////////////////////////////////////////////////////
RecordPipe recordPipe;
if(loadStep.getOverrideRecordPipeCapacity() != null)
Integer overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput);
if(overrideRecordPipeCapacity != null)
{
recordPipe = new RecordPipe(loadStep.getOverrideRecordPipeCapacity());
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + loadStep.getOverrideRecordPipeCapacity());
}
else if(transformStep.getOverrideRecordPipeCapacity() != null)
{
recordPipe = new RecordPipe(transformStep.getOverrideRecordPipeCapacity());
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + transformStep.getOverrideRecordPipeCapacity());
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
}
else
{
recordPipe = new RecordPipe();
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
if(overrideRecordPipeCapacity != null)
{
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
}
else
{
recordPipe = new RecordPipe();
}
}
extractStep.setRecordPipe(recordPipe);
@ -112,8 +117,14 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
transformStep.setTransaction(transaction);
}
List<QRecord> loadedRecordList = new ArrayList<>();
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) ->
List<QRecord> loadedRecordList = new ArrayList<>();
AsyncRecordPipeLoop asyncRecordPipeLoop = new AsyncRecordPipeLoop();
if(overrideRecordPipeCapacity != null && overrideRecordPipeCapacity < asyncRecordPipeLoop.getMinRecordsToConsume())
{
asyncRecordPipeLoop.setMinRecordsToConsume(overrideRecordPipeCapacity);
}
int recordCount = asyncRecordPipeLoop.run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) ->
{
extractStep.run(runBackendStepInput, runBackendStepOutput);
return (runBackendStepOutput);