Update to only call consumer after the loop when it's smart to do so

This commit is contained in:
2022-10-11 11:30:28 -05:00
parent 6ec6b173b9
commit aa64f1b7f3

View File

@ -74,10 +74,11 @@ public class AsyncRecordPipeLoop
AsyncJobState jobState = AsyncJobState.RUNNING; AsyncJobState jobState = AsyncJobState.RUNNING;
AsyncJobStatus asyncJobStatus = null; AsyncJobStatus asyncJobStatus = null;
int recordCount = 0; int recordCount = 0;
int nextSleepMillis = INIT_SLEEP_MS; int nextSleepMillis = INIT_SLEEP_MS;
long lastReceivedRecordsAt = System.currentTimeMillis(); long lastReceivedRecordsAt = System.currentTimeMillis();
long jobStartTime = System.currentTimeMillis(); long jobStartTime = System.currentTimeMillis();
boolean everCalledConsumer = false;
while(jobState.equals(AsyncJobState.RUNNING)) while(jobState.equals(AsyncJobState.RUNNING))
{ {
@ -105,6 +106,7 @@ public class AsyncRecordPipeLoop
lastReceivedRecordsAt = System.currentTimeMillis(); lastReceivedRecordsAt = System.currentTimeMillis();
nextSleepMillis = INIT_SLEEP_MS; nextSleepMillis = INIT_SLEEP_MS;
everCalledConsumer = true;
recordCount += consumer.get(); recordCount += consumer.get();
LOG.debug(String.format("Processed %,d records so far", recordCount)); LOG.debug(String.format("Processed %,d records so far", recordCount));
@ -148,10 +150,19 @@ public class AsyncRecordPipeLoop
throw (new QException("Job failed with an error", asyncJobStatus.getCaughtException())); throw (new QException("Job failed with an error", asyncJobStatus.getCaughtException()));
} }
//////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// send the final records to the consumer // // send the final records to the consumer //
//////////////////////////////////////////// // note - we'll only make this "final" call to the consumer if: //
recordCount += consumer.get(); // - there are currently records in the pipe //
// - OR we never called the consumer (e.g., there were 0 rows produced by the supplier //
// This prevents cases where a consumer may get pages of records in the loop, but then //
// be called here post-loop w/ 0 records, and may interpret it as a sign that no records //
// were ever supplied. //
///////////////////////////////////////////////////////////////////////////////////////////
if(recordPipe.countAvailableRecords() > 0 || !everCalledConsumer)
{
recordCount += consumer.get();
}
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();