mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-21 06:28:44 +00:00
Compare commits
1 Commits
snapshot-f
...
snapshot-f
Author | SHA1 | Date | |
---|---|---|---|
b0aaf61e99 |
@ -83,15 +83,6 @@ public class AsyncRecordPipeLoop
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
boolean everCalledConsumer = false;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// in case the pipe capacity has been made very small (useful in tests!), //
|
||||
// then make the minRecordsToConsume match it. //
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
if(recordPipe.getCapacity() < minRecordsToConsume)
|
||||
{
|
||||
minRecordsToConsume = recordPipe.getCapacity();
|
||||
}
|
||||
|
||||
while(jobState.equals(AsyncJobState.RUNNING))
|
||||
{
|
||||
if(recordPipe.countAvailableRecords() < minRecordsToConsume)
|
||||
|
@ -273,7 +273,7 @@ public class GenerateReportAction
|
||||
RunBackendStepOutput transformStepOutput = null;
|
||||
if(tableView != null && tableView.getRecordTransformStep() != null)
|
||||
{
|
||||
transformStep = QCodeLoader.getAdHoc(AbstractTransformStep.class, tableView.getRecordTransformStep());
|
||||
transformStep = QCodeLoader.getBackendStep(AbstractTransformStep.class, tableView.getRecordTransformStep());
|
||||
|
||||
transformStepInput = new RunBackendStepInput();
|
||||
transformStepInput.setValues(reportInput.getInputValues());
|
||||
|
@ -44,8 +44,7 @@ public class RecordPipe
|
||||
private static final long BLOCKING_SLEEP_MILLIS = 100;
|
||||
private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes
|
||||
|
||||
private int capacity = 1_000;
|
||||
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(capacity);
|
||||
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(1_000);
|
||||
|
||||
private boolean isTerminated = false;
|
||||
|
||||
@ -73,7 +72,6 @@ public class RecordPipe
|
||||
*******************************************************************************/
|
||||
public RecordPipe(Integer overrideCapacity)
|
||||
{
|
||||
this.capacity = overrideCapacity;
|
||||
queue = new ArrayBlockingQueue<>(overrideCapacity);
|
||||
}
|
||||
|
||||
@ -215,14 +213,4 @@ public class RecordPipe
|
||||
this.postRecordActions = postRecordActions;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Getter for capacity
|
||||
**
|
||||
*******************************************************************************/
|
||||
public int getCapacity()
|
||||
{
|
||||
return capacity;
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,9 @@
|
||||
package com.kingsrook.qqq.backend.core.context;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Stack;
|
||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||
@ -31,6 +34,7 @@ import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.AbstractActionInput;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
||||
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -47,6 +51,7 @@ public class QContext
|
||||
private static ThreadLocal<QBackendTransaction> qBackendTransactionThreadLocal = new ThreadLocal<>();
|
||||
private static ThreadLocal<Stack<AbstractActionInput>> actionStackThreadLocal = new ThreadLocal<>();
|
||||
|
||||
private static ThreadLocal<Map<String, Serializable>> objectsThreadLocal = new ThreadLocal<>();
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -132,6 +137,7 @@ public class QContext
|
||||
qSessionThreadLocal.remove();
|
||||
qBackendTransactionThreadLocal.remove();
|
||||
actionStackThreadLocal.remove();
|
||||
objectsThreadLocal.remove();
|
||||
}
|
||||
|
||||
|
||||
@ -259,4 +265,92 @@ public class QContext
|
||||
|
||||
return (Optional.of(actionStackThreadLocal.get().get(0)));
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** get one named object from the Context for the current thread. may return null.
|
||||
*******************************************************************************/
|
||||
public static Serializable getObject(String key)
|
||||
{
|
||||
if(objectsThreadLocal.get() == null)
|
||||
{
|
||||
return null;
|
||||
}
|
||||
return objectsThreadLocal.get().get(key);
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** get one named object from the Context for the current thread, cast to the
|
||||
** specified type if possible. if not found, or wrong type, empty is returned.
|
||||
*******************************************************************************/
|
||||
public static <T extends Serializable> Optional<T> getObject(String key, Class<T> type)
|
||||
{
|
||||
Serializable object = getObject(key);
|
||||
|
||||
if(type.isInstance(object))
|
||||
{
|
||||
return Optional.of(type.cast(object));
|
||||
}
|
||||
else if(object == null)
|
||||
{
|
||||
return Optional.empty();
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG.warn("Unexpected type of object found in session under key [" + key + "]",
|
||||
logPair("expectedType", type.getName()),
|
||||
logPair("actualType", object.getClass().getName())
|
||||
);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** put a named object into the Context for the current thread.
|
||||
*******************************************************************************/
|
||||
public static void setObject(String key, Serializable object)
|
||||
{
|
||||
if(objectsThreadLocal.get() == null)
|
||||
{
|
||||
objectsThreadLocal.set(new HashMap<>());
|
||||
}
|
||||
objectsThreadLocal.get().put(key, object);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** remove a named object from the Context of the current thread.
|
||||
*******************************************************************************/
|
||||
public static void removeObject(String key)
|
||||
{
|
||||
if(objectsThreadLocal.get() != null)
|
||||
{
|
||||
objectsThreadLocal.get().remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** get the full map of named objects for the current thread (possibly null).
|
||||
*******************************************************************************/
|
||||
public static Map<String, Serializable> getObjects()
|
||||
{
|
||||
return objectsThreadLocal.get();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** fully replace the map of named objets for the current thread.
|
||||
*******************************************************************************/
|
||||
public static void setObjects(Map<String, Serializable> objects)
|
||||
{
|
||||
objectsThreadLocal.set(objects);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,156 @@
|
||||
/*
|
||||
* QQQ - Low-code Application Framework for Engineers.
|
||||
* Copyright (C) 2021-2024. Kingsrook, LLC
|
||||
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
|
||||
* contact@kingsrook.com
|
||||
* https://github.com/Kingsrook/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.kingsrook.qqq.backend.core.model.actions.audits;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import com.kingsrook.qqq.backend.core.context.QContext;
|
||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecordEntity;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Object to accumulate multiple audit-details to be recorded under a single
|
||||
** audit per-record, within a process step. Especially useful if/when the
|
||||
** process step spreads its work out through multiple classes.
|
||||
**
|
||||
** Pattern of usage looks like:
|
||||
**
|
||||
** <pre>
|
||||
** // declare as a field (or local) w/ message for the audit headers
|
||||
** private AuditDetailAccumulator auditDetailAccumulator = new AuditDetailAccumulator("Audit header message");
|
||||
**
|
||||
** // put into thread context
|
||||
** AuditDetailAccumulator.setInContext(auditDetailAccumulator);
|
||||
**
|
||||
** // add a detail message for a record
|
||||
** auditDetailAccumulator.addAuditDetail(tableName, record, "Detail message");
|
||||
**
|
||||
** // in another class, get the accumulator from context and safely add a detail message
|
||||
** AuditDetailAccumulator.getFromContext().ifPresent(ada -> ada.addAuditDetail(tableName, record, "More Details"));
|
||||
**
|
||||
** // at the end of a step run/runOnePage method, add the accumulated audit details to step output
|
||||
** auditDetailAccumulator.getAccumulatedAuditSingleInputs().forEach(runBackendStepOutput::addAuditSingleInput);
|
||||
** auditDetailAccumulator.clear();
|
||||
** </pre>
|
||||
*******************************************************************************/
|
||||
public class AuditDetailAccumulator implements Serializable
|
||||
{
|
||||
private static final QLogger LOG = QLogger.getLogger(AuditDetailAccumulator.class);
|
||||
|
||||
private static final String objectKey = AuditDetailAccumulator.class.getSimpleName();
|
||||
|
||||
private String header;
|
||||
|
||||
private Map<TableNameAndPrimaryKey, AuditSingleInput> recordAuditInputMap = new HashMap<>();
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Constructor
|
||||
**
|
||||
*******************************************************************************/
|
||||
public AuditDetailAccumulator(String header)
|
||||
{
|
||||
this.header = header;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
public void setInContext()
|
||||
{
|
||||
QContext.setObject(objectKey, this);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
public static Optional<AuditDetailAccumulator> getFromContext()
|
||||
{
|
||||
return QContext.getObject(objectKey, AuditDetailAccumulator.class);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
public void addAuditDetail(String tableName, QRecordEntity entity, String message)
|
||||
{
|
||||
if(entity != null)
|
||||
{
|
||||
addAuditDetail(tableName, entity.toQRecord(), message);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
public void addAuditDetail(String tableName, QRecord record, String message)
|
||||
{
|
||||
QTableMetaData table = QContext.getQInstance().getTable(tableName);
|
||||
Serializable primaryKey = record.getValue(table.getPrimaryKeyField());
|
||||
if(primaryKey == null)
|
||||
{
|
||||
LOG.info("Missing primary key in input record - audit detail message will not be recorded.", logPair("message", message));
|
||||
return;
|
||||
}
|
||||
|
||||
AuditSingleInput auditSingleInput = recordAuditInputMap.computeIfAbsent(new TableNameAndPrimaryKey(tableName, primaryKey), (key) -> new AuditSingleInput(table, record, header));
|
||||
auditSingleInput.addDetail(message);
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
public Collection<AuditSingleInput> getAccumulatedAuditSingleInputs()
|
||||
{
|
||||
return (recordAuditInputMap.values());
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
public void clear()
|
||||
{
|
||||
recordAuditInputMap.clear();
|
||||
}
|
||||
|
||||
|
||||
private record TableNameAndPrimaryKey(String tableName, Serializable primaryKey) {}
|
||||
}
|
@ -41,7 +41,7 @@ import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
|
||||
/*******************************************************************************
|
||||
** Input data to insert a single audit record (with optional child record)..
|
||||
*******************************************************************************/
|
||||
public class AuditSingleInput
|
||||
public class AuditSingleInput implements Serializable
|
||||
{
|
||||
private String auditTableName;
|
||||
private String auditUserName;
|
||||
|
@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
|
||||
|
||||
import java.util.Optional;
|
||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||
@ -37,11 +38,11 @@ import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||
** should be written to the output object's Records, noting that when running
|
||||
** as a streamed-ETL process, those input & output objects will be instances of
|
||||
** the StreamedBackendStep{Input,Output} classes, that will be associated with
|
||||
** a page of records flowing through a pipe.
|
||||
** a page of records flowing thorugh a pipe.
|
||||
**
|
||||
** Also - use the transaction member variable!!!
|
||||
*******************************************************************************/
|
||||
public abstract class AbstractLoadStep
|
||||
public abstract class AbstractLoadStep implements BackendStep
|
||||
{
|
||||
private Optional<QBackendTransaction> transaction = Optional.empty();
|
||||
protected QSession session;
|
||||
@ -50,25 +51,6 @@ public abstract class AbstractLoadStep
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Deprecated
|
||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
runOnePage(runBackendStepInput, runBackendStepOutput);
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** todo - make abstract when run is deleted.
|
||||
*******************************************************************************/
|
||||
public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Allow subclasses to do an action before the run is complete - before any
|
||||
** pages of records are passed in.
|
||||
|
@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
|
||||
|
||||
import java.util.Optional;
|
||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||
@ -39,33 +40,12 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutp
|
||||
** a page of records flowing through a pipe.
|
||||
**
|
||||
*******************************************************************************/
|
||||
public abstract class AbstractTransformStep implements ProcessSummaryProviderInterface
|
||||
public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface
|
||||
{
|
||||
private Optional<QBackendTransaction> transaction = Optional.empty();
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Deprecated
|
||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
runOnePage(runBackendStepInput, runBackendStepOutput);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** todo - make abstract when run is deleted.
|
||||
*******************************************************************************/
|
||||
public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Allow subclasses to do an action before the run is complete - before any
|
||||
** pages of records are passed in.
|
||||
|
@ -63,7 +63,7 @@ public class BaseStreamedETLStep
|
||||
protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput)
|
||||
{
|
||||
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE);
|
||||
return (QCodeLoader.getAdHoc(AbstractTransformStep.class, codeReference));
|
||||
return (QCodeLoader.getBackendStep(AbstractTransformStep.class, codeReference));
|
||||
}
|
||||
|
||||
|
||||
@ -74,7 +74,7 @@ public class BaseStreamedETLStep
|
||||
protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput)
|
||||
{
|
||||
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE);
|
||||
return (QCodeLoader.getAdHoc(AbstractLoadStep.class, codeReference));
|
||||
return (QCodeLoader.getBackendStep(AbstractLoadStep.class, codeReference));
|
||||
}
|
||||
|
||||
|
||||
|
@ -83,32 +83,23 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
|
||||
// before it can put more records in. //
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
RecordPipe recordPipe;
|
||||
Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity");
|
||||
Integer overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
recordPipe = new RecordPipe();
|
||||
}
|
||||
recordPipe = new RecordPipe();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,41 +81,13 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
|
||||
// basically repeat the preview step, but with no limit //
|
||||
//////////////////////////////////////////////////////////
|
||||
runBackendStepInput.getAsyncJobCallback().updateStatus("Validating Records");
|
||||
|
||||
RecordPipe recordPipe = new RecordPipe();
|
||||
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
|
||||
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
// let the transform step override the capacity for the record pipe //
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
RecordPipe recordPipe;
|
||||
Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity");
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
recordPipe = new RecordPipe();
|
||||
}
|
||||
}
|
||||
|
||||
/////////////////////////////
|
||||
// set up the extract step //
|
||||
/////////////////////////////
|
||||
extractStep.setLimit(null);
|
||||
extractStep.setRecordPipe(recordPipe);
|
||||
extractStep.preRun(runBackendStepInput, runBackendStepOutput);
|
||||
|
||||
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
|
||||
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
|
||||
|
||||
List<QRecord> previewRecordList = new ArrayList<>();
|
||||
|
@ -151,7 +151,6 @@ public class StreamedETLWithFrontendProcess
|
||||
.withField(new QFieldMetaData(FIELD_DEFAULT_QUERY_FILTER, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER)))
|
||||
.withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass)))
|
||||
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass)))
|
||||
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractTransformStep.class.getName()))
|
||||
.withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT)))
|
||||
.withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS)))
|
||||
);
|
||||
@ -171,8 +170,7 @@ public class StreamedETLWithFrontendProcess
|
||||
.withName(STEP_NAME_EXECUTE)
|
||||
.withCode(new QCodeReference(StreamedETLExecuteStep.class))
|
||||
.withInputData(new QFunctionInputMetaData()
|
||||
.withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass)))
|
||||
.withField(new QFieldMetaData(FIELD_LOAD_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractLoadStep.class.getName())))
|
||||
.withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass))))
|
||||
.withOutputMetaData(new QFunctionOutputMetaData()
|
||||
.withField(new QFieldMetaData(FIELD_PROCESS_SUMMARY, QFieldType.STRING))
|
||||
);
|
||||
|
@ -0,0 +1,81 @@
|
||||
/*
|
||||
* QQQ - Low-code Application Framework for Engineers.
|
||||
* Copyright (C) 2021-2024. Kingsrook, LLC
|
||||
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
|
||||
* contact@kingsrook.com
|
||||
* https://github.com/Kingsrook/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.kingsrook.qqq.backend.core.model.actions.audits;
|
||||
|
||||
|
||||
import java.util.Collection;
|
||||
import com.kingsrook.qqq.backend.core.BaseTest;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
import com.kingsrook.qqq.backend.core.utils.TestUtils;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Unit test for AuditDetailAccumulator
|
||||
*******************************************************************************/
|
||||
class AuditDetailAccumulatorTest extends BaseTest
|
||||
{
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Test
|
||||
void test()
|
||||
{
|
||||
AuditDetailAccumulator auditDetailAccumulator = new AuditDetailAccumulator("During test");
|
||||
auditDetailAccumulator.addAuditDetail(TestUtils.TABLE_NAME_PERSON, new QRecord().withValue("id", 1701), "Something happened");
|
||||
auditDetailAccumulator.addAuditDetail(TestUtils.TABLE_NAME_PERSON, new QRecord().withValue("id", 1701), "Something else happened");
|
||||
auditDetailAccumulator.addAuditDetail(TestUtils.TABLE_NAME_PERSON, new QRecord().withValue("id", 74256), "Something happened here too");
|
||||
auditDetailAccumulator.addAuditDetail(TestUtils.TABLE_NAME_ORDER, new QRecord().withValue("id", 74256), "Something happened to an order");
|
||||
|
||||
Collection<AuditSingleInput> auditSingleInputs = auditDetailAccumulator.getAccumulatedAuditSingleInputs();
|
||||
assertEquals(3, auditSingleInputs.size());
|
||||
assertThat(auditSingleInputs).anyMatch(asi -> asi.getAuditTableName().equals(TestUtils.TABLE_NAME_PERSON) && asi.getRecordId().equals(1701) && asi.getDetails().size() == 2);
|
||||
assertThat(auditSingleInputs).anyMatch(asi -> asi.getAuditTableName().equals(TestUtils.TABLE_NAME_PERSON) && asi.getRecordId().equals(74256) && asi.getDetails().size() == 1);
|
||||
assertThat(auditSingleInputs).anyMatch(asi -> asi.getAuditTableName().equals(TestUtils.TABLE_NAME_ORDER) && asi.getRecordId().equals(74256) && asi.getDetails().size() == 1);
|
||||
|
||||
auditDetailAccumulator.clear();;
|
||||
auditSingleInputs = auditDetailAccumulator.getAccumulatedAuditSingleInputs();
|
||||
assertEquals(0, auditSingleInputs.size());
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Test
|
||||
void testContext()
|
||||
{
|
||||
AuditDetailAccumulator auditDetailAccumulator = new AuditDetailAccumulator("During test");
|
||||
auditDetailAccumulator.setInContext();
|
||||
|
||||
AuditDetailAccumulator.getFromContext().ifPresent(ada -> ada.addAuditDetail(TestUtils.TABLE_NAME_PERSON, new QRecord().withValue("id", 1701), "Something happened"));
|
||||
|
||||
Collection<AuditSingleInput> auditSingleInputs = auditDetailAccumulator.getAccumulatedAuditSingleInputs();
|
||||
assertEquals(1, auditSingleInputs.size());
|
||||
assertThat(auditSingleInputs).anyMatch(asi -> asi.getAuditTableName().equals(TestUtils.TABLE_NAME_PERSON) && asi.getRecordId().equals(1701) && asi.getDetails().size() == 1);
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user