mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
QQQ-37 update streamed-etl steps to not have to use different record-list
This commit is contained in:
@ -51,6 +51,10 @@ public class RunBackendStepInput extends AbstractActionInput
|
|||||||
private QProcessCallback callback;
|
private QProcessCallback callback;
|
||||||
private AsyncJobCallback asyncJobCallback;
|
private AsyncJobCallback asyncJobCallback;
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
// note - new fields should generally be added in method: cloneFieldsInto //
|
||||||
|
////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -85,6 +89,25 @@ public class RunBackendStepInput extends AbstractActionInput
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Kinda like a reverse copy-constructor -- for a subclass that wants all the
|
||||||
|
** field values from this object. Keep this in sync with the fields in this class!
|
||||||
|
**
|
||||||
|
** Of note - the processState does NOT get cloned - because... well, in our first
|
||||||
|
** use-case (a subclass that doesn't WANT the same/full state), that's what we needed.
|
||||||
|
*******************************************************************************/
|
||||||
|
public void cloneFieldsInto(RunBackendStepInput target)
|
||||||
|
{
|
||||||
|
target.setStepName(getStepName());
|
||||||
|
target.setSession(getSession());
|
||||||
|
target.setTableName(getTableName());
|
||||||
|
target.setProcessName(getProcessName());
|
||||||
|
target.setAsyncJobCallback(getAsyncJobCallback());
|
||||||
|
target.setValues(getValues());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
@ -429,4 +452,5 @@ public class RunBackendStepInput extends AbstractActionInput
|
|||||||
}
|
}
|
||||||
return (asyncJobCallback);
|
return (asyncJobCallback);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,31 +22,27 @@
|
|||||||
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
|
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
|
||||||
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Base class for the Load (aka, store) logic of Streamed ETL processes.
|
** Base class for the Load (aka, store) logic of Streamed ETL processes.
|
||||||
**
|
**
|
||||||
** Records are to be read out of the inputRecordPage field, and after storing,
|
** Records are to be read out of the input object's Records field, and after storing,
|
||||||
** should be written to the outputRecordPage. That is to say, DO NOT use the
|
** should be written to the output object's Records, noting that when running
|
||||||
** recordList in the step input/output objects.
|
** as a streamed-ETL process, those input & output objects will be instances of
|
||||||
|
** the StreamedBackendStep{Input,Output} classes, that will be associated with
|
||||||
|
** a page of records flowing thorugh a pipe.
|
||||||
**
|
**
|
||||||
** Also - use the transaction member variable!!!
|
** Also - use the transaction member variable!!!
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public abstract class AbstractLoadStep implements BackendStep
|
public abstract class AbstractLoadStep implements BackendStep
|
||||||
{
|
{
|
||||||
private List<QRecord> inputRecordPage = new ArrayList<>();
|
|
||||||
private List<QRecord> outputRecordPage = new ArrayList<>();
|
|
||||||
|
|
||||||
private Optional<QBackendTransaction> transaction = Optional.empty();
|
private Optional<QBackendTransaction> transaction = Optional.empty();
|
||||||
|
|
||||||
|
|
||||||
@ -87,50 +83,6 @@ public abstract class AbstractLoadStep implements BackendStep
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
** Getter for recordPage
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
public List<QRecord> getInputRecordPage()
|
|
||||||
{
|
|
||||||
return inputRecordPage;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
** Setter for recordPage
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
public void setInputRecordPage(List<QRecord> inputRecordPage)
|
|
||||||
{
|
|
||||||
this.inputRecordPage = inputRecordPage;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
** Getter for outputRecordPage
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
public List<QRecord> getOutputRecordPage()
|
|
||||||
{
|
|
||||||
return outputRecordPage;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
** Setter for outputRecordPage
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
public void setOutputRecordPage(List<QRecord> outputRecordPage)
|
|
||||||
{
|
|
||||||
this.outputRecordPage = outputRecordPage;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Setter for transaction
|
** Setter for transaction
|
||||||
**
|
**
|
||||||
|
@ -22,27 +22,24 @@
|
|||||||
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
|
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
|
||||||
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Base class for the Transform logic of Streamed ETL processes.
|
** Base class for the Transform logic of Streamed ETL processes.
|
||||||
**
|
**
|
||||||
** Records are to be read out of the inputRecordPage field, and after transformation,
|
** Records are to be read out of the input object's Records field, and after storing,
|
||||||
** should be written to the outputRecordPage. That is to say, DO NOT use the
|
** should be written to the output object's Records, noting that when running
|
||||||
** recordList in the step input/output objects.
|
** as a streamed-ETL process, those input & output objects will be instances of
|
||||||
|
** the StreamedBackendStep{Input,Output} classes, that will be associated with
|
||||||
|
** a page of records flowing through a pipe.
|
||||||
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface
|
public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface
|
||||||
{
|
{
|
||||||
private List<QRecord> inputRecordPage = new ArrayList<>();
|
|
||||||
private List<QRecord> outputRecordPage = new ArrayList<>();
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -69,47 +66,4 @@ public abstract class AbstractTransformStep implements BackendStep, ProcessSumma
|
|||||||
////////////////////////
|
////////////////////////
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
** Getter for recordPage
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
public List<QRecord> getInputRecordPage()
|
|
||||||
{
|
|
||||||
return inputRecordPage;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
** Setter for recordPage
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
public void setInputRecordPage(List<QRecord> inputRecordPage)
|
|
||||||
{
|
|
||||||
this.inputRecordPage = inputRecordPage;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
** Getter for outputRecordPage
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
public List<QRecord> getOutputRecordPage()
|
|
||||||
{
|
|
||||||
return outputRecordPage;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
|
||||||
** Setter for outputRecordPage
|
|
||||||
**
|
|
||||||
*******************************************************************************/
|
|
||||||
public void setOutputRecordPage(List<QRecord> outputRecordPage)
|
|
||||||
{
|
|
||||||
this.outputRecordPage = outputRecordPage;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -52,10 +52,10 @@ public class LoadViaInsertStep extends AbstractLoadStep
|
|||||||
InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance());
|
InsertInput insertInput = new InsertInput(runBackendStepInput.getInstance());
|
||||||
insertInput.setSession(runBackendStepInput.getSession());
|
insertInput.setSession(runBackendStepInput.getSession());
|
||||||
insertInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE));
|
insertInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE));
|
||||||
insertInput.setRecords(getInputRecordPage());
|
insertInput.setRecords(runBackendStepInput.getRecords());
|
||||||
getTransaction().ifPresent(insertInput::setTransaction);
|
getTransaction().ifPresent(insertInput::setTransaction);
|
||||||
InsertOutput insertOutput = new InsertAction().execute(insertInput);
|
InsertOutput insertOutput = new InsertAction().execute(insertInput);
|
||||||
getOutputRecordPage().addAll(insertOutput.getRecords());
|
runBackendStepOutput.getRecords().addAll(insertOutput.getRecords());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -54,10 +54,10 @@ public class LoadViaUpdateStep extends AbstractLoadStep
|
|||||||
UpdateInput updateInput = new UpdateInput(runBackendStepInput.getInstance());
|
UpdateInput updateInput = new UpdateInput(runBackendStepInput.getInstance());
|
||||||
updateInput.setSession(runBackendStepInput.getSession());
|
updateInput.setSession(runBackendStepInput.getSession());
|
||||||
updateInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE));
|
updateInput.setTableName(runBackendStepInput.getValueString(FIELD_DESTINATION_TABLE));
|
||||||
updateInput.setRecords(getInputRecordPage());
|
updateInput.setRecords(runBackendStepInput.getRecords());
|
||||||
getTransaction().ifPresent(updateInput::setTransaction);
|
getTransaction().ifPresent(updateInput::setTransaction);
|
||||||
UpdateOutput updateOutput = new UpdateAction().execute(updateInput);
|
UpdateOutput updateOutput = new UpdateAction().execute(updateInput);
|
||||||
getOutputRecordPage().addAll(updateOutput.getRecords());
|
runBackendStepOutput.getRecords().addAll(updateOutput.getRecords());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -0,0 +1,77 @@
|
|||||||
|
/*
|
||||||
|
* QQQ - Low-code Application Framework for Engineers.
|
||||||
|
* Copyright (C) 2021-2022. Kingsrook, LLC
|
||||||
|
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
|
||||||
|
* contact@kingsrook.com
|
||||||
|
* https://github.com/Kingsrook/
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as
|
||||||
|
* published by the Free Software Foundation, either version 3 of the
|
||||||
|
* License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Subclass of RunBackendStepInput, meant for use in the pseudo-steps used by
|
||||||
|
** the Streamed-ETL-with-frontend processes - where the Record list is not the
|
||||||
|
** full process's record list - rather - is just a page at a time -- so this class
|
||||||
|
** overrides the getRecords and setRecords method, to just work with that page.
|
||||||
|
**
|
||||||
|
** Note - of importance over time may be the RunBackendStepInput::cloneFieldsInto
|
||||||
|
** method - e.g., if new fields are added to that class!
|
||||||
|
*******************************************************************************/
|
||||||
|
public class StreamedBackendStepInput extends RunBackendStepInput
|
||||||
|
{
|
||||||
|
private List<QRecord> inputRecords;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public StreamedBackendStepInput(RunBackendStepInput runBackendStepInput, List<QRecord> inputRecords)
|
||||||
|
{
|
||||||
|
super(runBackendStepInput.getInstance());
|
||||||
|
runBackendStepInput.cloneFieldsInto(this);
|
||||||
|
this.inputRecords = inputRecords;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public void setRecords(List<QRecord> records)
|
||||||
|
{
|
||||||
|
this.inputRecords = records;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public List<QRecord> getRecords()
|
||||||
|
{
|
||||||
|
return (inputRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,74 @@
|
|||||||
|
/*
|
||||||
|
* QQQ - Low-code Application Framework for Engineers.
|
||||||
|
* Copyright (C) 2021-2022. Kingsrook, LLC
|
||||||
|
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
|
||||||
|
* contact@kingsrook.com
|
||||||
|
* https://github.com/Kingsrook/
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as
|
||||||
|
* published by the Free Software Foundation, either version 3 of the
|
||||||
|
* License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Subclass of RunBackendStepOutput, meant for use in the pseudo-steps used by
|
||||||
|
** the Streamed-ETL-with-frontend processes - where the Record list is not the
|
||||||
|
** full process's record list - rather - is just a page at a time -- so this class
|
||||||
|
** overrides the getRecords and setRecords method, to just work with that page.
|
||||||
|
*******************************************************************************/
|
||||||
|
public class StreamedBackendStepOutput extends RunBackendStepOutput
|
||||||
|
{
|
||||||
|
private List<QRecord> outputRecords = new ArrayList<>();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public StreamedBackendStepOutput(RunBackendStepOutput runBackendStepOutput)
|
||||||
|
{
|
||||||
|
super();
|
||||||
|
setValues(runBackendStepOutput.getValues());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public void setRecords(List<QRecord> records)
|
||||||
|
{
|
||||||
|
this.outputRecords = records;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public List<QRecord> getRecords()
|
||||||
|
{
|
||||||
|
return (outputRecords);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -144,27 +144,32 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
|
|||||||
///////////////////////////////////
|
///////////////////////////////////
|
||||||
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
|
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
// make streamed input & output objects from the run input & outputs //
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
StreamedBackendStepInput streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, qRecords);
|
||||||
|
StreamedBackendStepOutput streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
|
||||||
|
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
// pass the records through the transform function //
|
// pass the records through the transform function //
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
transformStep.setInputRecordPage(qRecords);
|
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
|
||||||
transformStep.setOutputRecordPage(new ArrayList<>());
|
|
||||||
transformStep.run(runBackendStepInput, runBackendStepOutput);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////
|
////////////////////////////////////////////////
|
||||||
// pass the records through the load function //
|
// pass the records through the load function //
|
||||||
////////////////////////////////////////////////
|
////////////////////////////////////////////////
|
||||||
loadStep.setInputRecordPage(transformStep.getOutputRecordPage());
|
streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, streamedBackendStepOutput.getRecords());
|
||||||
loadStep.setOutputRecordPage(new ArrayList<>());
|
streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
|
||||||
loadStep.run(runBackendStepInput, runBackendStepOutput);
|
|
||||||
|
loadStep.run(streamedBackendStepInput, streamedBackendStepOutput);
|
||||||
|
|
||||||
///////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////
|
||||||
// copy a small number of records to the output list //
|
// copy a small number of records to the output list //
|
||||||
///////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while(loadedRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < loadStep.getOutputRecordPage().size())
|
while(loadedRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < streamedBackendStepOutput.getRecords().size())
|
||||||
{
|
{
|
||||||
loadedRecordList.add(loadStep.getOutputRecordPage().get(i++));
|
loadedRecordList.add(streamedBackendStepOutput.getRecords().get(i++));
|
||||||
}
|
}
|
||||||
|
|
||||||
currentRowCount += qRecords.size();
|
currentRowCount += qRecords.size();
|
||||||
|
@ -151,16 +151,21 @@ public class StreamedETLPreviewStep extends BaseStreamedETLStep implements Backe
|
|||||||
///////////////////////////////////
|
///////////////////////////////////
|
||||||
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
|
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
// make streamed input & output objects from the run input & outputs //
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
StreamedBackendStepInput streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, qRecords);
|
||||||
|
StreamedBackendStepOutput streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
|
||||||
|
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
// pass the records through the transform function //
|
// pass the records through the transform function //
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
transformStep.setInputRecordPage(qRecords);
|
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
|
||||||
transformStep.run(runBackendStepInput, runBackendStepOutput);
|
|
||||||
|
|
||||||
////////////////////////////////////////////////////
|
////////////////////////////////////////////////////
|
||||||
// add the transformed records to the output list //
|
// add the transformed records to the output list //
|
||||||
////////////////////////////////////////////////////
|
////////////////////////////////////////////////////
|
||||||
transformedRecordList.addAll(transformStep.getOutputRecordPage());
|
transformedRecordList.addAll(streamedBackendStepOutput.getRecords());
|
||||||
|
|
||||||
return (qRecords.size());
|
return (qRecords.size());
|
||||||
}
|
}
|
||||||
|
@ -128,19 +128,24 @@ public class StreamedETLValidateStep extends BaseStreamedETLStep implements Back
|
|||||||
///////////////////////////////////
|
///////////////////////////////////
|
||||||
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
|
List<QRecord> qRecords = recordPipe.consumeAvailableRecords();
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
// make streamed input & output objects from the run input & outputs //
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
StreamedBackendStepInput streamedBackendStepInput = new StreamedBackendStepInput(runBackendStepInput, qRecords);
|
||||||
|
StreamedBackendStepOutput streamedBackendStepOutput = new StreamedBackendStepOutput(runBackendStepOutput);
|
||||||
|
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
// pass the records through the transform function //
|
// pass the records through the transform function //
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
transformStep.setInputRecordPage(qRecords);
|
transformStep.run(streamedBackendStepInput, streamedBackendStepOutput);
|
||||||
transformStep.run(runBackendStepInput, runBackendStepOutput);
|
|
||||||
|
|
||||||
///////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////
|
||||||
// copy a small number of records to the output list //
|
// copy a small number of records to the output list //
|
||||||
///////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////
|
||||||
int i = 0;
|
int i = 0;
|
||||||
while(previewRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < transformStep.getOutputRecordPage().size())
|
while(previewRecordList.size() < PROCESS_OUTPUT_RECORD_LIST_LIMIT && i < streamedBackendStepOutput.getRecords().size())
|
||||||
{
|
{
|
||||||
previewRecordList.add(transformStep.getOutputRecordPage().get(i++));
|
previewRecordList.add(streamedBackendStepOutput.getRecords().get(i++));
|
||||||
}
|
}
|
||||||
|
|
||||||
currentRowCount += qRecords.size();
|
currentRowCount += qRecords.size();
|
||||||
|
@ -347,12 +347,12 @@ public class StreamedETLWithFrontendProcessTest
|
|||||||
@Override
|
@Override
|
||||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
{
|
{
|
||||||
for(QRecord qRecord : getInputRecordPage())
|
for(QRecord qRecord : runBackendStepInput.getRecords())
|
||||||
{
|
{
|
||||||
QRecord newQRecord = new QRecord();
|
QRecord newQRecord = new QRecord();
|
||||||
newQRecord.setValue("firstName", "Johnny");
|
newQRecord.setValue("firstName", "Johnny");
|
||||||
newQRecord.setValue("lastName", qRecord.getValueString("name"));
|
newQRecord.setValue("lastName", qRecord.getValueString("name"));
|
||||||
getOutputRecordPage().add(newQRecord);
|
runBackendStepOutput.getRecords().add(newQRecord);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -403,7 +403,7 @@ public class StreamedETLWithFrontendProcessTest
|
|||||||
@Override
|
@Override
|
||||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
{
|
{
|
||||||
for(QRecord qRecord : getInputRecordPage())
|
for(QRecord qRecord : runBackendStepInput.getRecords())
|
||||||
{
|
{
|
||||||
if(qRecord.getValueString("name").equals("Circle"))
|
if(qRecord.getValueString("name").equals("Circle"))
|
||||||
{
|
{
|
||||||
@ -414,7 +414,7 @@ public class StreamedETLWithFrontendProcessTest
|
|||||||
QRecord newQRecord = new QRecord();
|
QRecord newQRecord = new QRecord();
|
||||||
newQRecord.setValue("firstName", "Johnny");
|
newQRecord.setValue("firstName", "Johnny");
|
||||||
newQRecord.setValue("lastName", qRecord.getValueString("name"));
|
newQRecord.setValue("lastName", qRecord.getValueString("name"));
|
||||||
getOutputRecordPage().add(newQRecord);
|
runBackendStepOutput.getRecords().add(newQRecord);
|
||||||
|
|
||||||
okSummary.incrementCountAndAddPrimaryKey(qRecord.getValue("id"));
|
okSummary.incrementCountAndAddPrimaryKey(qRecord.getValue("id"));
|
||||||
}
|
}
|
||||||
@ -437,12 +437,12 @@ public class StreamedETLWithFrontendProcessTest
|
|||||||
@Override
|
@Override
|
||||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
{
|
{
|
||||||
for(QRecord qRecord : getInputRecordPage())
|
for(QRecord qRecord : runBackendStepInput.getRecords())
|
||||||
{
|
{
|
||||||
QRecord updatedQRecord = new QRecord();
|
QRecord updatedQRecord = new QRecord();
|
||||||
updatedQRecord.setValue("id", qRecord.getValue("id"));
|
updatedQRecord.setValue("id", qRecord.getValue("id"));
|
||||||
updatedQRecord.setValue("name", "Transformed:" + qRecord.getValueString("name"));
|
updatedQRecord.setValue("name", "Transformed:" + qRecord.getValueString("name"));
|
||||||
getOutputRecordPage().add(updatedQRecord);
|
runBackendStepOutput.getRecords().add(updatedQRecord);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +76,7 @@ public class ClonePeopleTransformStep extends AbstractTransformStep implements P
|
|||||||
@Override
|
@Override
|
||||||
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
public void run(RunBackendStepInput runBackendStepInput, RunBackendStepOutput runBackendStepOutput) throws QException
|
||||||
{
|
{
|
||||||
for(QRecord inputPerson : getInputRecordPage())
|
for(QRecord inputPerson : runBackendStepInput.getRecords())
|
||||||
{
|
{
|
||||||
Serializable id = inputPerson.getValue("id");
|
Serializable id = inputPerson.getValue("id");
|
||||||
if("Garret".equals(inputPerson.getValueString("firstName")))
|
if("Garret".equals(inputPerson.getValueString("firstName")))
|
||||||
@ -92,7 +92,7 @@ public class ClonePeopleTransformStep extends AbstractTransformStep implements P
|
|||||||
QRecord outputPerson = new QRecord(inputPerson);
|
QRecord outputPerson = new QRecord(inputPerson);
|
||||||
outputPerson.setValue("id", null);
|
outputPerson.setValue("id", null);
|
||||||
outputPerson.setValue("firstName", "Clone of: " + inputPerson.getValueString("firstName"));
|
outputPerson.setValue("firstName", "Clone of: " + inputPerson.getValueString("firstName"));
|
||||||
getOutputRecordPage().add(outputPerson);
|
runBackendStepOutput.getRecords().add(outputPerson);
|
||||||
|
|
||||||
if(inputPerson.getValueString("firstName").matches("Clone of.*"))
|
if(inputPerson.getValueString("firstName").matches("Clone of.*"))
|
||||||
{
|
{
|
||||||
|
@ -91,7 +91,7 @@ class ClonePeopleTransformStepTest
|
|||||||
RunBackendStepOutput output = new RunBackendStepOutput();
|
RunBackendStepOutput output = new RunBackendStepOutput();
|
||||||
ClonePeopleTransformStep clonePeopleTransformStep = new ClonePeopleTransformStep();
|
ClonePeopleTransformStep clonePeopleTransformStep = new ClonePeopleTransformStep();
|
||||||
|
|
||||||
clonePeopleTransformStep.setInputRecordPage(queryOutput.getRecords());
|
input.setRecords(queryOutput.getRecords());
|
||||||
clonePeopleTransformStep.run(input, output);
|
clonePeopleTransformStep.run(input, output);
|
||||||
|
|
||||||
ArrayList<ProcessSummaryLine> processSummary = clonePeopleTransformStep.getProcessSummary(true);
|
ArrayList<ProcessSummaryLine> processSummary = clonePeopleTransformStep.getProcessSummary(true);
|
||||||
|
Reference in New Issue
Block a user