diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/exceptions/QRuntimeException.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/exceptions/QRuntimeException.java
new file mode 100644
index 00000000..40b58c00
--- /dev/null
+++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/exceptions/QRuntimeException.java
@@ -0,0 +1,62 @@
+/*
+ * QQQ - Low-code Application Framework for Engineers.
+ * Copyright (C) 2021-2022. Kingsrook, LLC
+ * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
+ * contact@kingsrook.com
+ * https://github.com/Kingsrook/
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package com.kingsrook.qqq.backend.core.exceptions;
+
+
+/*******************************************************************************
+ * Base class for unchecked exceptions thrown in qqq.
+ *
+ *******************************************************************************/
+public class QRuntimeException extends RuntimeException
+{
+
+ /*******************************************************************************
+ ** Constructor of message
+ **
+ *******************************************************************************/
+ public QRuntimeException(Throwable t)
+ {
+ super(t.getMessage(), t);
+ }
+
+
+
+ /*******************************************************************************
+ ** Constructor of message
+ **
+ *******************************************************************************/
+ public QRuntimeException(String message)
+ {
+ super(message);
+ }
+
+
+
+ /*******************************************************************************
+ ** Constructor of message & cause
+ **
+ *******************************************************************************/
+ public QRuntimeException(String message, Throwable cause)
+ {
+ super(message, cause);
+ }
+}
diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/data/QRecordEntity.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/data/QRecordEntity.java
index ed6548d0..cc2f1a9b 100644
--- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/data/QRecordEntity.java
+++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/data/QRecordEntity.java
@@ -35,6 +35,7 @@ import java.util.List;
import java.util.Locale;
import java.util.Optional;
import com.kingsrook.qqq.backend.core.exceptions.QException;
+import com.kingsrook.qqq.backend.core.exceptions.QRuntimeException;
import com.kingsrook.qqq.backend.core.utils.ListingHash;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -75,7 +76,7 @@ public abstract class QRecordEntity
** Build an entity of this QRecord type from a QRecord
**
*******************************************************************************/
- protected void populateFromQRecord(QRecord qRecord) throws QException
+ protected void populateFromQRecord(QRecord qRecord) throws QRuntimeException
{
try
{
@@ -89,7 +90,7 @@ public abstract class QRecordEntity
}
catch(Exception e)
{
- throw (new QException("Error building entity from qRecord.", e));
+ throw (new QRuntimeException("Error building entity from qRecord.", e));
}
}
@@ -99,7 +100,7 @@ public abstract class QRecordEntity
** Convert this entity to a QRecord.
**
*******************************************************************************/
- public QRecord toQRecord() throws QException
+ public QRecord toQRecord() throws QRuntimeException
{
try
{
@@ -115,7 +116,7 @@ public abstract class QRecordEntity
}
catch(Exception e)
{
- throw (new QException("Error building qRecord from entity.", e));
+ throw (new QRuntimeException("Error building qRecord from entity.", e));
}
}
diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java
index 960bdf1e..4cfdcacd 100644
--- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java
+++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java
@@ -92,4 +92,23 @@ public abstract class AbstractTransformStep implements BackendStep, ProcessSumma
return (transaction);
}
+
+
+ /*******************************************************************************
+ ** Allow this transform step to specify the capacity of the pipe being used by the process.
+ **
+ ** The specific use-case for which this is being added is, in the case of a process
+ ** with many records being extracted, if the transform job is too slow, then the pipe
+ ** can get filled, and the extractor (who puts records into the pipe) can time out
+ ** waiting for capacity in the pipe to open up, while a slow loader is consuming
+ ** the records.
+ **
+ ** In other words, for a slow transformer, setting a lower pipe capacity can help prevent
+ ** time-out errors ("Giving up adding record to pipe...")
+ *******************************************************************************/
+ public Integer getOverrideRecordPipeCapacity()
+ {
+ return (null);
+ }
+
}
diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java
index ba808189..0e28a20a 100644
--- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java
+++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java
@@ -79,7 +79,12 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
if(loadStep.getOverrideRecordPipeCapacity() != null)
{
recordPipe = new RecordPipe(loadStep.getOverrideRecordPipeCapacity());
- LOG.debug("Overriding record pipe capacity to: " + loadStep.getOverrideRecordPipeCapacity());
+ LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + loadStep.getOverrideRecordPipeCapacity());
+ }
+ else if(transformStep.getOverrideRecordPipeCapacity() != null)
+ {
+ recordPipe = new RecordPipe(transformStep.getOverrideRecordPipeCapacity());
+ LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + transformStep.getOverrideRecordPipeCapacity());
}
else
{