mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-21 22:48:44 +00:00
Compare commits
2 Commits
snapshot-f
...
snapshot-f
Author | SHA1 | Date | |
---|---|---|---|
dd103d323d | |||
238521aa57 |
@ -83,15 +83,6 @@ public class AsyncRecordPipeLoop
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
boolean everCalledConsumer = false;
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
// in case the pipe capacity has been made very small (useful in tests!), //
|
||||
// then make the minRecordsToConsume match it. //
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
if(recordPipe.getCapacity() < minRecordsToConsume)
|
||||
{
|
||||
minRecordsToConsume = recordPipe.getCapacity();
|
||||
}
|
||||
|
||||
while(jobState.equals(AsyncJobState.RUNNING))
|
||||
{
|
||||
if(recordPipe.countAvailableRecords() < minRecordsToConsume)
|
||||
|
@ -273,7 +273,7 @@ public class GenerateReportAction
|
||||
RunBackendStepOutput transformStepOutput = null;
|
||||
if(tableView != null && tableView.getRecordTransformStep() != null)
|
||||
{
|
||||
transformStep = QCodeLoader.getAdHoc(AbstractTransformStep.class, tableView.getRecordTransformStep());
|
||||
transformStep = QCodeLoader.getBackendStep(AbstractTransformStep.class, tableView.getRecordTransformStep());
|
||||
|
||||
transformStepInput = new RunBackendStepInput();
|
||||
transformStepInput.setValues(reportInput.getInputValues());
|
||||
|
@ -44,8 +44,7 @@ public class RecordPipe
|
||||
private static final long BLOCKING_SLEEP_MILLIS = 100;
|
||||
private static final long MAX_SLEEP_LOOP_MILLIS = 300_000; // 5 minutes
|
||||
|
||||
private int capacity = 1_000;
|
||||
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(capacity);
|
||||
private ArrayBlockingQueue<QRecord> queue = new ArrayBlockingQueue<>(1_000);
|
||||
|
||||
private boolean isTerminated = false;
|
||||
|
||||
@ -73,7 +72,6 @@ public class RecordPipe
|
||||
*******************************************************************************/
|
||||
public RecordPipe(Integer overrideCapacity)
|
||||
{
|
||||
this.capacity = overrideCapacity;
|
||||
queue = new ArrayBlockingQueue<>(overrideCapacity);
|
||||
}
|
||||
|
||||
@ -215,14 +213,4 @@ public class RecordPipe
|
||||
this.postRecordActions = postRecordActions;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Getter for capacity
|
||||
**
|
||||
*******************************************************************************/
|
||||
public int getCapacity()
|
||||
{
|
||||
return capacity;
|
||||
}
|
||||
}
|
||||
|
@ -136,19 +136,22 @@ public class ReplaceAction extends AbstractQActionFunction<ReplaceInput, Replace
|
||||
UpdateOutput updateOutput = new UpdateAction().execute(updateInput);
|
||||
output.setUpdateOutput(updateOutput);
|
||||
|
||||
QQueryFilter deleteFilter = new QQueryFilter(new QFilterCriteria(primaryKeyField, QCriteriaOperator.NOT_IN, primaryKeysToKeep));
|
||||
if(input.getFilter() != null)
|
||||
if(input.getPerformDeletes())
|
||||
{
|
||||
deleteFilter.addSubFilter(input.getFilter());
|
||||
}
|
||||
QQueryFilter deleteFilter = new QQueryFilter(new QFilterCriteria(primaryKeyField, QCriteriaOperator.NOT_IN, primaryKeysToKeep));
|
||||
if(input.getFilter() != null)
|
||||
{
|
||||
deleteFilter.addSubFilter(input.getFilter());
|
||||
}
|
||||
|
||||
DeleteInput deleteInput = new DeleteInput();
|
||||
deleteInput.setTableName(table.getName());
|
||||
deleteInput.setQueryFilter(deleteFilter);
|
||||
deleteInput.setTransaction(transaction);
|
||||
deleteInput.setOmitDmlAudit(input.getOmitDmlAudit());
|
||||
DeleteOutput deleteOutput = new DeleteAction().execute(deleteInput);
|
||||
output.setDeleteOutput(deleteOutput);
|
||||
DeleteInput deleteInput = new DeleteInput();
|
||||
deleteInput.setTableName(table.getName());
|
||||
deleteInput.setQueryFilter(deleteFilter);
|
||||
deleteInput.setTransaction(transaction);
|
||||
deleteInput.setOmitDmlAudit(input.getOmitDmlAudit());
|
||||
DeleteOutput deleteOutput = new DeleteAction().execute(deleteInput);
|
||||
output.setDeleteOutput(deleteOutput);
|
||||
}
|
||||
|
||||
if(weOwnTheTransaction)
|
||||
{
|
||||
|
@ -39,6 +39,7 @@ public class ReplaceInput extends AbstractTableActionInput
|
||||
private UniqueKey key;
|
||||
private List<QRecord> records;
|
||||
private QQueryFilter filter;
|
||||
private boolean performDeletes = true;
|
||||
|
||||
private boolean omitDmlAudit = false;
|
||||
|
||||
@ -207,4 +208,35 @@ public class ReplaceInput extends AbstractTableActionInput
|
||||
return (this);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Getter for performDeletes
|
||||
*******************************************************************************/
|
||||
public boolean getPerformDeletes()
|
||||
{
|
||||
return (this.performDeletes);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Setter for performDeletes
|
||||
*******************************************************************************/
|
||||
public void setPerformDeletes(boolean performDeletes)
|
||||
{
|
||||
this.performDeletes = performDeletes;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Fluent setter for performDeletes
|
||||
*******************************************************************************/
|
||||
public ReplaceInput withPerformDeletes(boolean performDeletes)
|
||||
{
|
||||
this.performDeletes = performDeletes;
|
||||
return (this);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -34,6 +34,7 @@ import java.util.Map;
|
||||
public abstract class QWidgetData
|
||||
{
|
||||
private String label;
|
||||
private String sublabel;
|
||||
private String footerHTML;
|
||||
private List<String> dropdownNameList;
|
||||
private List<String> dropdownLabelList;
|
||||
@ -51,6 +52,7 @@ public abstract class QWidgetData
|
||||
private List<List<Serializable>> csvData;
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Getter for type
|
||||
*******************************************************************************/
|
||||
@ -356,4 +358,35 @@ public abstract class QWidgetData
|
||||
return (this);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Getter for sublabel
|
||||
*******************************************************************************/
|
||||
public String getSublabel()
|
||||
{
|
||||
return (this.sublabel);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Setter for sublabel
|
||||
*******************************************************************************/
|
||||
public void setSublabel(String sublabel)
|
||||
{
|
||||
this.sublabel = sublabel;
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Fluent setter for sublabel
|
||||
*******************************************************************************/
|
||||
public QWidgetData withSublabel(String sublabel)
|
||||
{
|
||||
this.sublabel = sublabel;
|
||||
return (this);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
|
||||
|
||||
import java.util.Optional;
|
||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||
@ -37,11 +38,11 @@ import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||
** should be written to the output object's Records, noting that when running
|
||||
** as a streamed-ETL process, those input & output objects will be instances of
|
||||
** the StreamedBackendStep{Input,Output} classes, that will be associated with
|
||||
** a page of records flowing through a pipe.
|
||||
** a page of records flowing thorugh a pipe.
|
||||
**
|
||||
** Also - use the transaction member variable!!!
|
||||
*******************************************************************************/
|
||||
public abstract class AbstractLoadStep
|
||||
public abstract class AbstractLoadStep implements BackendStep
|
||||
{
|
||||
private Optional<QBackendTransaction> transaction = Optional.empty();
|
||||
protected QSession session;
|
||||
@ -50,25 +51,6 @@ public abstract class AbstractLoadStep
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Deprecated
|
||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
runOnePage(runBackendStepInput, runBackendStepOutput);
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** todo - make abstract when run is deleted.
|
||||
*******************************************************************************/
|
||||
public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Allow subclasses to do an action before the run is complete - before any
|
||||
** pages of records are passed in.
|
||||
|
@ -24,6 +24,7 @@ package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwit
|
||||
|
||||
import java.util.Optional;
|
||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||
@ -39,33 +40,12 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutp
|
||||
** a page of records flowing through a pipe.
|
||||
**
|
||||
*******************************************************************************/
|
||||
public abstract class AbstractTransformStep implements ProcessSummaryProviderInterface
|
||||
public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface
|
||||
{
|
||||
private Optional<QBackendTransaction> transaction = Optional.empty();
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Deprecated
|
||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
runOnePage(runBackendStepInput, runBackendStepOutput);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** todo - make abstract when run is deleted.
|
||||
*******************************************************************************/
|
||||
public void runOnePage(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Allow subclasses to do an action before the run is complete - before any
|
||||
** pages of records are passed in.
|
||||
|
@ -63,7 +63,7 @@ public class BaseStreamedETLStep
|
||||
protected AbstractTransformStep getTransformStep(RunBackendStepInput runBackendStepInput)
|
||||
{
|
||||
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_TRANSFORM_CODE);
|
||||
return (QCodeLoader.getAdHoc(AbstractTransformStep.class, codeReference));
|
||||
return (QCodeLoader.getBackendStep(AbstractTransformStep.class, codeReference));
|
||||
}
|
||||
|
||||
|
||||
@ -74,7 +74,7 @@ public class BaseStreamedETLStep
|
||||
protected AbstractLoadStep getLoadStep(RunBackendStepInput runBackendStepInput)
|
||||
{
|
||||
QCodeReference codeReference = (QCodeReference) runBackendStepInput.getValue(StreamedETLWithFrontendProcess.FIELD_LOAD_CODE);
|
||||
return (QCodeLoader.getAdHoc(AbstractLoadStep.class, codeReference));
|
||||
return (QCodeLoader.getBackendStep(AbstractLoadStep.class, codeReference));
|
||||
}
|
||||
|
||||
|
||||
|
@ -83,32 +83,23 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
|
||||
// before it can put more records in. //
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
RecordPipe recordPipe;
|
||||
Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity");
|
||||
Integer overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
overrideRecordPipeCapacity = loadStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + loadStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
recordPipe = new RecordPipe();
|
||||
}
|
||||
recordPipe = new RecordPipe();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -81,41 +81,13 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
|
||||
// basically repeat the preview step, but with no limit //
|
||||
//////////////////////////////////////////////////////////
|
||||
runBackendStepInput.getAsyncJobCallback().updateStatus("Validating Records");
|
||||
|
||||
RecordPipe recordPipe = new RecordPipe();
|
||||
AbstractExtractStep extractStep = getExtractStep(runBackendStepInput);
|
||||
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
|
||||
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
// let the transform step override the capacity for the record pipe //
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
RecordPipe recordPipe;
|
||||
Integer overrideRecordPipeCapacity = runBackendStepInput.getValueInteger("recordPipeCapacity");
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per input value [recordPipeCapacity], we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
overrideRecordPipeCapacity = transformStep.getOverrideRecordPipeCapacity(runBackendStepInput);
|
||||
if(overrideRecordPipeCapacity != null)
|
||||
{
|
||||
recordPipe = new RecordPipe(overrideRecordPipeCapacity);
|
||||
LOG.debug("per " + transformStep.getClass().getName() + ", we are overriding record pipe capacity to: " + overrideRecordPipeCapacity);
|
||||
}
|
||||
else
|
||||
{
|
||||
recordPipe = new RecordPipe();
|
||||
}
|
||||
}
|
||||
|
||||
/////////////////////////////
|
||||
// set up the extract step //
|
||||
/////////////////////////////
|
||||
extractStep.setLimit(null);
|
||||
extractStep.setRecordPipe(recordPipe);
|
||||
extractStep.preRun(runBackendStepInput, runBackendStepOutput);
|
||||
|
||||
AbstractTransformStep transformStep = getTransformStep(runBackendStepInput);
|
||||
transformStep.preRun(runBackendStepInput, runBackendStepOutput);
|
||||
|
||||
List<QRecord> previewRecordList = new ArrayList<>();
|
||||
|
@ -151,7 +151,6 @@ public class StreamedETLWithFrontendProcess
|
||||
.withField(new QFieldMetaData(FIELD_DEFAULT_QUERY_FILTER, QFieldType.STRING).withDefaultValue(defaultFieldValues.get(FIELD_DEFAULT_QUERY_FILTER)))
|
||||
.withField(new QFieldMetaData(FIELD_EXTRACT_CODE, QFieldType.STRING).withDefaultValue(extractStepClass == null ? null : new QCodeReference(extractStepClass)))
|
||||
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE, QFieldType.STRING).withDefaultValue(transformStepClass == null ? null : new QCodeReference(transformStepClass)))
|
||||
.withField(new QFieldMetaData(FIELD_TRANSFORM_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractTransformStep.class.getName()))
|
||||
.withField(new QFieldMetaData(FIELD_PREVIEW_MESSAGE, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_PREVIEW_MESSAGE, DEFAULT_PREVIEW_MESSAGE_FOR_INSERT)))
|
||||
.withField(new QFieldMetaData(FIELD_TRANSACTION_LEVEL, QFieldType.STRING).withDefaultValue(defaultFieldValues.getOrDefault(FIELD_TRANSACTION_LEVEL, TRANSACTION_LEVEL_PROCESS)))
|
||||
);
|
||||
@ -171,8 +170,7 @@ public class StreamedETLWithFrontendProcess
|
||||
.withName(STEP_NAME_EXECUTE)
|
||||
.withCode(new QCodeReference(StreamedETLExecuteStep.class))
|
||||
.withInputData(new QFunctionInputMetaData()
|
||||
.withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass)))
|
||||
.withField(new QFieldMetaData(FIELD_LOAD_CODE + "_expectedType", QFieldType.STRING).withDefaultValue(AbstractLoadStep.class.getName())))
|
||||
.withField(new QFieldMetaData(FIELD_LOAD_CODE, QFieldType.STRING).withDefaultValue(loadStepClass == null ? null : new QCodeReference(loadStepClass))))
|
||||
.withOutputMetaData(new QFunctionOutputMetaData()
|
||||
.withField(new QFieldMetaData(FIELD_PROCESS_SUMMARY, QFieldType.STRING))
|
||||
);
|
||||
|
Reference in New Issue
Block a user