From 293b3e420772f775624d5f5686e7cf27abe520c6 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Fri, 9 Dec 2022 16:44:53 -0600 Subject: [PATCH] Add qruntime exception; let transform step set pipe capacity --- .../core/exceptions/QRuntimeException.java | 62 +++++++++++++++++++ .../core/model/data/QRecordEntity.java | 9 +-- .../AbstractTransformStep.java | 19 ++++++ .../StreamedETLExecuteStep.java | 7 ++- 4 files changed, 92 insertions(+), 5 deletions(-) create mode 100644 qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/exceptions/QRuntimeException.java diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/exceptions/QRuntimeException.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/exceptions/QRuntimeException.java new file mode 100644 index 00000000..40b58c00 --- /dev/null +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/exceptions/QRuntimeException.java @@ -0,0 +1,62 @@ +/* + * QQQ - Low-code Application Framework for Engineers. + * Copyright (C) 2021-2022. Kingsrook, LLC + * 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States + * contact@kingsrook.com + * https://github.com/Kingsrook/ + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package com.kingsrook.qqq.backend.core.exceptions; + + +/******************************************************************************* + * Base class for unchecked exceptions thrown in qqq. + * + *******************************************************************************/ +public class QRuntimeException extends RuntimeException +{ + + /******************************************************************************* + ** Constructor of message + ** + *******************************************************************************/ + public QRuntimeException(Throwable t) + { + super(t.getMessage(), t); + } + + + + /******************************************************************************* + ** Constructor of message + ** + *******************************************************************************/ + public QRuntimeException(String message) + { + super(message); + } + + + + /******************************************************************************* + ** Constructor of message & cause + ** + *******************************************************************************/ + public QRuntimeException(String message, Throwable cause) + { + super(message, cause); + } +} diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/data/QRecordEntity.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/data/QRecordEntity.java index ed6548d0..cc2f1a9b 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/data/QRecordEntity.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/model/data/QRecordEntity.java @@ -35,6 +35,7 @@ import java.util.List; import java.util.Locale; import java.util.Optional; import com.kingsrook.qqq.backend.core.exceptions.QException; +import com.kingsrook.qqq.backend.core.exceptions.QRuntimeException; import com.kingsrook.qqq.backend.core.utils.ListingHash; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -75,7 +76,7 @@ public abstract class QRecordEntity ** Build an entity of this QRecord type from a QRecord ** *******************************************************************************/ - protected void populateFromQRecord(QRecord qRecord) throws QException + protected void populateFromQRecord(QRecord qRecord) throws QRuntimeException { try { @@ -89,7 +90,7 @@ public abstract class QRecordEntity } catch(Exception e) { - throw (new QException("Error building entity from qRecord.", e)); + throw (new QRuntimeException("Error building entity from qRecord.", e)); } } @@ -99,7 +100,7 @@ public abstract class QRecordEntity ** Convert this entity to a QRecord. ** *******************************************************************************/ - public QRecord toQRecord() throws QException + public QRecord toQRecord() throws QRuntimeException { try { @@ -115,7 +116,7 @@ public abstract class QRecordEntity } catch(Exception e) { - throw (new QException("Error building qRecord from entity.", e)); + throw (new QRuntimeException("Error building qRecord from entity.", e)); } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java index 960bdf1e..4cfdcacd 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/AbstractTransformStep.java @@ -92,4 +92,23 @@ public abstract class AbstractTransformStep implements BackendStep, ProcessSumma return (transaction); } + + + /******************************************************************************* + ** Allow this transform step to specify the capacity of the pipe being used by the process. + ** + ** The specific use-case for which this is being added is, in the case of a process + ** with many records being extracted, if the transform job is too slow, then the pipe + ** can get filled, and the extractor (who puts records into the pipe) can time out + ** waiting for capacity in the pipe to open up, while a slow loader is consuming + ** the records. + ** + ** In other words, for a slow transformer, setting a lower pipe capacity can help prevent + ** time-out errors ("Giving up adding record to pipe...") + *******************************************************************************/ + public Integer getOverrideRecordPipeCapacity() + { + return (null); + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java index ba808189..0e28a20a 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/etl/streamedwithfrontend/StreamedETLExecuteStep.java @@ -79,7 +79,12 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe if(loadStep.getOverrideRecordPipeCapacity() != null) { recordPipe = new RecordPipe(loadStep.getOverrideRecordPipeCapacity()); - LOG.debug("Overriding record pipe capacity to: " + loadStep.getOverrideRecordPipeCapacity()); + LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + loadStep.getOverrideRecordPipeCapacity()); + } + else if(transformStep.getOverrideRecordPipeCapacity() != null) + { + recordPipe = new RecordPipe(transformStep.getOverrideRecordPipeCapacity()); + LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + transformStep.getOverrideRecordPipeCapacity()); } else {