mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 05:01:07 +00:00
Introduce RecordPipeBufferedWrapper, to be used in QueryAction when includingAssociations and writing to a pipe
This commit is contained in:
@ -0,0 +1,79 @@
|
|||||||
|
/*
|
||||||
|
* QQQ - Low-code Application Framework for Engineers.
|
||||||
|
* Copyright (C) 2021-2022. Kingsrook, LLC
|
||||||
|
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
|
||||||
|
* contact@kingsrook.com
|
||||||
|
* https://github.com/Kingsrook/
|
||||||
|
*
|
||||||
|
* This program is free software: you can redistribute it and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License as
|
||||||
|
* published by the Free Software Foundation, either version 3 of the
|
||||||
|
* License, or (at your option) any later version.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful,
|
||||||
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||||
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||||
|
* GNU Affero General Public License for more details.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package com.kingsrook.qqq.backend.core.actions.reporting;
|
||||||
|
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Subclass of BufferedRecordPipe, which ultimately sends records down to an
|
||||||
|
** original RecordPipe.
|
||||||
|
**
|
||||||
|
** Meant to be used where: someone passed in a RecordPipe (so they have a reference
|
||||||
|
** to it, and they are waiting to read from it), but the producer knows that
|
||||||
|
** it will be better to buffer the records, so they want to use a buffered pipe
|
||||||
|
** (but they still need the records to end up in the original pipe - thus -
|
||||||
|
** it gets wrapped by an object of this class).
|
||||||
|
*******************************************************************************/
|
||||||
|
public class RecordPipeBufferedWrapper extends BufferedRecordPipe
|
||||||
|
{
|
||||||
|
private RecordPipe wrappedPipe;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Constructor - uses default buffer size
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public RecordPipeBufferedWrapper(RecordPipe wrappedPipe)
|
||||||
|
{
|
||||||
|
this.wrappedPipe = wrappedPipe;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Constructor - customize buffer size.
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public RecordPipeBufferedWrapper(Integer bufferSize, RecordPipe wrappedPipe)
|
||||||
|
{
|
||||||
|
super(bufferSize);
|
||||||
|
this.wrappedPipe = wrappedPipe;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** when it's time to actually add records into the pipe, actually add them
|
||||||
|
** into the wrapped pipe!
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public void addRecords(List<QRecord> records) throws QException
|
||||||
|
{
|
||||||
|
wrappedPipe.addRecords(records);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -34,6 +34,7 @@ import com.kingsrook.qqq.backend.core.actions.customizers.AbstractPostQueryCusto
|
|||||||
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
|
import com.kingsrook.qqq.backend.core.actions.customizers.QCodeLoader;
|
||||||
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizers;
|
import com.kingsrook.qqq.backend.core.actions.customizers.TableCustomizers;
|
||||||
import com.kingsrook.qqq.backend.core.actions.reporting.BufferedRecordPipe;
|
import com.kingsrook.qqq.backend.core.actions.reporting.BufferedRecordPipe;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipeBufferedWrapper;
|
||||||
import com.kingsrook.qqq.backend.core.actions.values.QPossibleValueTranslator;
|
import com.kingsrook.qqq.backend.core.actions.values.QPossibleValueTranslator;
|
||||||
import com.kingsrook.qqq.backend.core.actions.values.QValueFormatter;
|
import com.kingsrook.qqq.backend.core.actions.values.QValueFormatter;
|
||||||
import com.kingsrook.qqq.backend.core.context.QContext;
|
import com.kingsrook.qqq.backend.core.context.QContext;
|
||||||
@ -83,6 +84,15 @@ public class QueryAction
|
|||||||
if(queryInput.getRecordPipe() != null)
|
if(queryInput.getRecordPipe() != null)
|
||||||
{
|
{
|
||||||
queryInput.getRecordPipe().setPostRecordActions(this::postRecordActions);
|
queryInput.getRecordPipe().setPostRecordActions(this::postRecordActions);
|
||||||
|
|
||||||
|
if(queryInput.getIncludeAssociations())
|
||||||
|
{
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// if the user requested to include associations, it's important that that is buffered, //
|
||||||
|
// (for performance reasons), so, wrap the user's pipe with a buffer //
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
queryInput.setRecordPipe(new RecordPipeBufferedWrapper(queryInput.getRecordPipe()));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
QBackendModuleDispatcher qBackendModuleDispatcher = new QBackendModuleDispatcher();
|
QBackendModuleDispatcher qBackendModuleDispatcher = new QBackendModuleDispatcher();
|
||||||
@ -111,6 +121,7 @@ public class QueryAction
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
private void manageAssociations(QueryInput queryInput, List<QRecord> queryOutputRecords) throws QException
|
private void manageAssociations(QueryInput queryInput, List<QRecord> queryOutputRecords) throws QException
|
||||||
{
|
{
|
||||||
|
LOG.info("In manageAssociations for " + queryInput.getTableName() + " with " + queryOutputRecords.size() + " records");
|
||||||
QTableMetaData table = queryInput.getTable();
|
QTableMetaData table = queryInput.getTable();
|
||||||
for(Association association : CollectionUtils.nonNullList(table.getAssociations()))
|
for(Association association : CollectionUtils.nonNullList(table.getAssociations()))
|
||||||
{
|
{
|
||||||
|
@ -261,7 +261,7 @@ public class ValidateRecordSecurityLockHelper
|
|||||||
QSecurityKeyType securityKeyType = QContext.getQInstance().getSecurityKeyType(recordSecurityLock.getSecurityKeyType());
|
QSecurityKeyType securityKeyType = QContext.getQInstance().getSecurityKeyType(recordSecurityLock.getSecurityKeyType());
|
||||||
if(StringUtils.hasContent(securityKeyType.getAllAccessKeyName()) && QContext.getQSession().hasSecurityKeyValue(securityKeyType.getAllAccessKeyName(), true, QFieldType.BOOLEAN))
|
if(StringUtils.hasContent(securityKeyType.getAllAccessKeyName()) && QContext.getQSession().hasSecurityKeyValue(securityKeyType.getAllAccessKeyName(), true, QFieldType.BOOLEAN))
|
||||||
{
|
{
|
||||||
LOG.debug("Session has " + securityKeyType.getAllAccessKeyName() + " - not checking this lock.");
|
LOG.trace("Session has " + securityKeyType.getAllAccessKeyName() + " - not checking this lock.");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -26,6 +26,7 @@ import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface;
|
|||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
||||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
|
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
|
||||||
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -43,7 +44,16 @@ public class MemoryQueryAction implements QueryInterface
|
|||||||
try
|
try
|
||||||
{
|
{
|
||||||
QueryOutput queryOutput = new QueryOutput(queryInput);
|
QueryOutput queryOutput = new QueryOutput(queryInput);
|
||||||
queryOutput.addRecords(MemoryRecordStore.getInstance().query(queryInput));
|
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// add the records to the output one-by-one -- this more closely matches how "real" backends perform //
|
||||||
|
// and works better w/ pipes //
|
||||||
|
///////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
for(QRecord qRecord : MemoryRecordStore.getInstance().query(queryInput))
|
||||||
|
{
|
||||||
|
queryOutput.addRecord(qRecord);
|
||||||
|
}
|
||||||
|
|
||||||
return (queryOutput);
|
return (queryOutput);
|
||||||
}
|
}
|
||||||
catch(Exception e)
|
catch(Exception e)
|
||||||
|
@ -25,6 +25,7 @@ package com.kingsrook.qqq.backend.core.actions.tables;
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import com.kingsrook.qqq.backend.core.BaseTest;
|
import com.kingsrook.qqq.backend.core.BaseTest;
|
||||||
|
import com.kingsrook.qqq.backend.core.actions.async.AsyncRecordPipeLoop;
|
||||||
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
|
import com.kingsrook.qqq.backend.core.actions.reporting.RecordPipe;
|
||||||
import com.kingsrook.qqq.backend.core.context.QContext;
|
import com.kingsrook.qqq.backend.core.context.QContext;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
@ -200,6 +201,41 @@ class QueryActionTest extends BaseTest
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
void testQueryManyRecordsAssociationsWithPipe() throws QException
|
||||||
|
{
|
||||||
|
QContext.getQSession().withSecurityKeyValue(TestUtils.SECURITY_KEY_TYPE_STORE_ALL_ACCESS, true);
|
||||||
|
insertNOrdersWithAssociations(2500);
|
||||||
|
|
||||||
|
RecordPipe pipe = new RecordPipe(1000);
|
||||||
|
QueryInput queryInput = new QueryInput();
|
||||||
|
queryInput.setTableName(TestUtils.TABLE_NAME_ORDER);
|
||||||
|
queryInput.setRecordPipe(pipe);
|
||||||
|
queryInput.setIncludeAssociations(true);
|
||||||
|
|
||||||
|
int recordsConsumed = new AsyncRecordPipeLoop().run("Test", null, pipe, (callback) ->
|
||||||
|
{
|
||||||
|
new QueryAction().execute(queryInput);
|
||||||
|
return (true);
|
||||||
|
}, () ->
|
||||||
|
{
|
||||||
|
List<QRecord> records = pipe.consumeAvailableRecords();
|
||||||
|
for(QRecord record : records)
|
||||||
|
{
|
||||||
|
assertEquals(1, record.getAssociatedRecords().get("orderLine").size());
|
||||||
|
assertEquals(1, record.getAssociatedRecords().get("extrinsics").size());
|
||||||
|
}
|
||||||
|
return (records.size());
|
||||||
|
});
|
||||||
|
|
||||||
|
assertEquals(2500, recordsConsumed);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
@ -356,4 +392,25 @@ class QueryActionTest extends BaseTest
|
|||||||
));
|
));
|
||||||
new InsertAction().execute(insertInput);
|
new InsertAction().execute(insertInput);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
private static void insertNOrdersWithAssociations(int n) throws QException
|
||||||
|
{
|
||||||
|
List<QRecord> recordList = new ArrayList<>();
|
||||||
|
for(int i = 0; i < n; i++)
|
||||||
|
{
|
||||||
|
recordList.add(new QRecord().withValue("storeId", 1).withValue("orderNo", "ORD" + i)
|
||||||
|
.withAssociatedRecord("orderLine", new QRecord().withValue("sku", "BASIC1").withValue("quantity", 3))
|
||||||
|
.withAssociatedRecord("extrinsics", new QRecord().withValue("key", "YOUR-FIELD").withValue("value", "YOUR-VALUE")));
|
||||||
|
}
|
||||||
|
|
||||||
|
InsertInput insertInput = new InsertInput();
|
||||||
|
insertInput.setTableName(TestUtils.TABLE_NAME_ORDER);
|
||||||
|
insertInput.setRecords(recordList);
|
||||||
|
new InsertAction().execute(insertInput);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user