diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java index 0a1273dc..0eddc2e3 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncRecordPipeLoop.java @@ -74,10 +74,11 @@ public class AsyncRecordPipeLoop AsyncJobState jobState = AsyncJobState.RUNNING; AsyncJobStatus asyncJobStatus = null; - int recordCount = 0; - int nextSleepMillis = INIT_SLEEP_MS; - long lastReceivedRecordsAt = System.currentTimeMillis(); - long jobStartTime = System.currentTimeMillis(); + int recordCount = 0; + int nextSleepMillis = INIT_SLEEP_MS; + long lastReceivedRecordsAt = System.currentTimeMillis(); + long jobStartTime = System.currentTimeMillis(); + boolean everCalledConsumer = false; while(jobState.equals(AsyncJobState.RUNNING)) { @@ -105,6 +106,7 @@ public class AsyncRecordPipeLoop lastReceivedRecordsAt = System.currentTimeMillis(); nextSleepMillis = INIT_SLEEP_MS; + everCalledConsumer = true; recordCount += consumer.get(); LOG.debug(String.format("Processed %,d records so far", recordCount)); @@ -148,10 +150,19 @@ public class AsyncRecordPipeLoop throw (new QException("Job failed with an error", asyncJobStatus.getCaughtException())); } - //////////////////////////////////////////// - // send the final records to the consumer // - //////////////////////////////////////////// - recordCount += consumer.get(); + /////////////////////////////////////////////////////////////////////////////////////////// + // send the final records to the consumer // + // note - we'll only make this "final" call to the consumer if: // + // - there are currently records in the pipe // + // - OR we never called the consumer (e.g., there were 0 rows produced by the supplier // + // This prevents cases where a consumer may get pages of records in the loop, but then // + // be called here post-loop w/ 0 records, and may interpret it as a sign that no records // + // were ever supplied. // + /////////////////////////////////////////////////////////////////////////////////////////// + if(recordPipe.countAvailableRecords() > 0 || !everCalledConsumer) + { + recordCount += consumer.get(); + } long endTime = System.currentTimeMillis();