mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
Add transaction to transform step and query action (and rdbms query)
This commit is contained in:
@ -22,6 +22,7 @@
|
|||||||
package com.kingsrook.qqq.backend.core.model.actions.tables.query;
|
package com.kingsrook.qqq.backend.core.model.actions.tables.query;
|
||||||
|
|
||||||
|
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
|
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput;
|
import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
||||||
@ -34,6 +35,7 @@ import com.kingsrook.qqq.backend.core.model.session.QSession;
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public class QueryInput extends AbstractTableActionInput
|
public class QueryInput extends AbstractTableActionInput
|
||||||
{
|
{
|
||||||
|
private QBackendTransaction transaction;
|
||||||
private QQueryFilter filter;
|
private QQueryFilter filter;
|
||||||
private Integer skip;
|
private Integer skip;
|
||||||
private Integer limit;
|
private Integer limit;
|
||||||
@ -44,6 +46,7 @@ public class QueryInput extends AbstractTableActionInput
|
|||||||
private boolean shouldGenerateDisplayValues = false;
|
private boolean shouldGenerateDisplayValues = false;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
@ -203,4 +206,39 @@ public class QueryInput extends AbstractTableActionInput
|
|||||||
{
|
{
|
||||||
this.shouldGenerateDisplayValues = shouldGenerateDisplayValues;
|
this.shouldGenerateDisplayValues = shouldGenerateDisplayValues;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for transaction
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public QBackendTransaction getTransaction()
|
||||||
|
{
|
||||||
|
return transaction;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Setter for transaction
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public void setTransaction(QBackendTransaction transaction)
|
||||||
|
{
|
||||||
|
this.transaction = transaction;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Fluent setter for transaction
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public QueryInput withTransaction(QBackendTransaction transaction)
|
||||||
|
{
|
||||||
|
this.transaction = transaction;
|
||||||
|
return (this);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
|
package com.kingsrook.qqq.backend.core.processes.implementations.etl.streamedwithfrontend;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.Optional;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
import com.kingsrook.qqq.backend.core.actions.processes.BackendStep;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepInput;
|
||||||
@ -40,6 +42,8 @@ import com.kingsrook.qqq.backend.core.model.actions.processes.RunBackendStepOutp
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface
|
public abstract class AbstractTransformStep implements BackendStep, ProcessSummaryProviderInterface
|
||||||
{
|
{
|
||||||
|
private Optional<QBackendTransaction> transaction = Optional.empty();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -66,4 +70,26 @@ public abstract class AbstractTransformStep implements BackendStep, ProcessSumma
|
|||||||
////////////////////////
|
////////////////////////
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Setter for transaction
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public void setTransaction(Optional<QBackendTransaction> transaction)
|
||||||
|
{
|
||||||
|
this.transaction = transaction;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for transaction
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public Optional<QBackendTransaction> getTransaction()
|
||||||
|
{
|
||||||
|
return (transaction);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -72,6 +72,7 @@ public class StreamedETLExecuteStep extends BaseStreamedETLStep implements Backe
|
|||||||
|
|
||||||
transaction = loadStep.openTransaction(runBackendStepInput);
|
transaction = loadStep.openTransaction(runBackendStepInput);
|
||||||
loadStep.setTransaction(transaction);
|
loadStep.setTransaction(transaction);
|
||||||
|
transformStep.setTransaction(transaction);
|
||||||
|
|
||||||
List<QRecord> loadedRecordList = new ArrayList<>();
|
List<QRecord> loadedRecordList = new ArrayList<>();
|
||||||
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) ->
|
int recordCount = new AsyncRecordPipeLoop().run("StreamedETL>Execute>ExtractStep", null, recordPipe, (status) ->
|
||||||
|
@ -99,10 +99,22 @@ public class RDBMSQueryAction extends AbstractRDBMSAction implements QueryInterf
|
|||||||
|
|
||||||
// todo sql customization - can edit sql and/or param list
|
// todo sql customization - can edit sql and/or param list
|
||||||
|
|
||||||
QueryOutput queryOutput = new QueryOutput(queryInput);
|
Connection connection;
|
||||||
|
boolean needToCloseConnection = false;
|
||||||
try(Connection connection = getConnection(queryInput))
|
if(queryInput.getTransaction() != null && queryInput.getTransaction() instanceof RDBMSTransaction rdbmsTransaction)
|
||||||
{
|
{
|
||||||
|
LOG.debug("Using connection from queryInput [" + rdbmsTransaction.getConnection() + "]");
|
||||||
|
connection = rdbmsTransaction.getConnection();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
connection = getConnection(queryInput);
|
||||||
|
needToCloseConnection = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
QueryOutput queryOutput = new QueryOutput(queryInput);
|
||||||
PreparedStatement statement = createStatement(connection, sql, queryInput);
|
PreparedStatement statement = createStatement(connection, sql, queryInput);
|
||||||
QueryManager.executeStatement(statement, ((ResultSet resultSet) ->
|
QueryManager.executeStatement(statement, ((ResultSet resultSet) ->
|
||||||
{
|
{
|
||||||
@ -132,10 +144,17 @@ public class RDBMSQueryAction extends AbstractRDBMSAction implements QueryInterf
|
|||||||
}
|
}
|
||||||
|
|
||||||
}), params);
|
}), params);
|
||||||
}
|
|
||||||
|
|
||||||
return queryOutput;
|
return queryOutput;
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if(needToCloseConnection)
|
||||||
|
{
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
catch(Exception e)
|
catch(Exception e)
|
||||||
{
|
{
|
||||||
LOG.warn("Error executing query", e);
|
LOG.warn("Error executing query", e);
|
||||||
|
@ -217,9 +217,8 @@ public class QueryManager
|
|||||||
{
|
{
|
||||||
return (T) Long.valueOf(((Integer) object));
|
return (T) Long.valueOf(((Integer) object));
|
||||||
}
|
}
|
||||||
else if(object instanceof Timestamp && returnClass.equals(LocalDateTime.class))
|
else if(object instanceof Timestamp timestamp && returnClass.equals(LocalDateTime.class))
|
||||||
{
|
{
|
||||||
Timestamp timestamp = (Timestamp) object;
|
|
||||||
return ((T) LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp.getTime()), ZoneId.systemDefault()));
|
return ((T) LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp.getTime()), ZoneId.systemDefault()));
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -23,8 +23,11 @@ package com.kingsrook.qqq.backend.module.rdbms.actions;
|
|||||||
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.QBackendTransaction;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
|
||||||
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
|
||||||
@ -447,4 +450,36 @@ public class RDBMSQueryActionTest extends RDBMSActionTest
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
void testLookInsideTransaction() throws QException
|
||||||
|
{
|
||||||
|
InsertInput insertInput = new InsertInput(TestUtils.defineInstance());
|
||||||
|
insertInput.setSession(new QSession());
|
||||||
|
insertInput.setTableName(TestUtils.defineTablePerson().getName());
|
||||||
|
|
||||||
|
InsertAction insertAction = new InsertAction();
|
||||||
|
QBackendTransaction transaction = insertAction.openTransaction(insertInput);
|
||||||
|
|
||||||
|
insertInput.setTransaction(transaction);
|
||||||
|
insertInput.setRecords(List.of(
|
||||||
|
new QRecord().withValue("firstName", "George").withValue("lastName", "Washington").withValue("email", "gw@kingsrook.com")
|
||||||
|
));
|
||||||
|
|
||||||
|
insertAction.execute(insertInput);
|
||||||
|
|
||||||
|
QueryInput queryInput = initQueryRequest();
|
||||||
|
QueryOutput queryOutput = new QueryAction().execute(queryInput);
|
||||||
|
Assertions.assertEquals(5, queryOutput.getRecords().size(), "Query without the transaction should not see the new row.");
|
||||||
|
|
||||||
|
queryInput = initQueryRequest();
|
||||||
|
queryInput.setTransaction(transaction);
|
||||||
|
queryOutput = new QueryAction().execute(queryInput);
|
||||||
|
Assertions.assertEquals(6, queryOutput.getRecords().size(), "Query with the transaction should see the new row.");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Reference in New Issue
Block a user