Add ExtractViaBasepullQueryStep; add pagination & piping to api query

This commit is contained in:
2022-10-26 12:24:35 -05:00
parent 77927fd318
commit 82810c2b66
18 changed files with 689 additions and 54 deletions

View File

@ -22,13 +22,11 @@
package com.kingsrook.qqq.backend.module.api.actions;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.interfaces.QueryInterface;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
@ -55,33 +53,83 @@ public class APIQueryAction extends AbstractAPIAction implements QueryInterface
QTableMetaData table = queryInput.getTable();
preAction(queryInput);
try
QueryOutput queryOutput = new QueryOutput(queryInput);
Integer originalLimit = queryInput.getLimit();
Integer limit = originalLimit;
Integer skip = queryInput.getSkip();
if(limit == null)
{
QQueryFilter filter = queryInput.getFilter();
String paramString = apiActionUtil.buildQueryStringForGet(filter, queryInput.getLimit(), queryInput.getSkip(), table.getFields());
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
HttpClient client = httpClientBuilder.build();
String url = apiActionUtil.buildTableUrl(table) + paramString;
LOG.info("API URL: " + url);
HttpGet request = new HttpGet(url);
apiActionUtil.setupAuthorizationInRequest(request);
apiActionUtil.setupContentTypeInRequest(request);
apiActionUtil.setupAdditionalHeaders(request);
HttpResponse response = client.execute(request);
List<QRecord> queryResults = apiActionUtil.processGetResponse(table, response);
QueryOutput queryOutput = new QueryOutput(queryInput);
queryOutput.addRecords(queryResults);
return (queryOutput);
limit = apiActionUtil.getApiStandardLimit();
}
catch(Exception e)
int totalCount = 0;
while(true)
{
LOG.warn("Error in API Query", e);
throw new QException("Error executing query: " + e.getMessage(), e);
try
{
QQueryFilter filter = queryInput.getFilter();
String paramString = apiActionUtil.buildQueryStringForGet(filter, limit, skip, table.getFields());
HttpClientBuilder httpClientBuilder = HttpClientBuilder.create();
HttpClient client = httpClientBuilder.build();
String url = apiActionUtil.buildTableUrl(table) + paramString;
LOG.info("API URL: " + url);
///////////////////////////
// todo - 429 handling!! //
///////////////////////////
HttpGet request = new HttpGet(url);
apiActionUtil.setupAuthorizationInRequest(request);
apiActionUtil.setupContentTypeInRequest(request);
apiActionUtil.setupAdditionalHeaders(request);
HttpResponse response = client.execute(request);
int count = apiActionUtil.processGetResponse(table, response, queryOutput);
totalCount += count;
/////////////////////////////////////////////////////////////////////////
// if we've fetched at least as many as the original limit, then break //
/////////////////////////////////////////////////////////////////////////
if(originalLimit != null && totalCount >= originalLimit)
{
return (queryOutput);
}
////////////////////////////////////////////////////////////////////////////////////
// if we got back less than a full page this time, then we must be done, so break //
////////////////////////////////////////////////////////////////////////////////////
if(count == 0 || (limit != null && count < limit))
{
return (queryOutput);
}
///////////////////////////////////////////////////////////////////
// if there's an async callback that says we're cancelled, break //
///////////////////////////////////////////////////////////////////
if(queryInput.getAsyncJobCallback().wasCancelRequested())
{
LOG.info("Breaking query job, as requested.");
return (queryOutput);
}
////////////////////////////////////////////////////////////////////////////
// else, increment the skip by the count we just got, and query for more. //
////////////////////////////////////////////////////////////////////////////
if(skip == null)
{
skip = 0;
}
skip += count;
}
catch(Exception e)
{
LOG.warn("Error in API Query", e);
throw new QException("Error executing query: " + e.getMessage(), e);
}
}
}

View File

@ -26,7 +26,6 @@ import java.io.IOException;
import java.io.Serializable;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
@ -35,6 +34,8 @@ import com.kingsrook.qqq.backend.core.model.actions.AbstractTableActionInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QFilterCriteria;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.fields.QFieldMetaData;
import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
@ -74,7 +75,7 @@ public class BaseAPIActionUtil
*******************************************************************************/
public long getMillisToSleepAfterEveryCall()
{
return 0;
return (0);
}
@ -84,7 +85,7 @@ public class BaseAPIActionUtil
*******************************************************************************/
public int getInitialRateLimitBackoffMillis()
{
return 0;
return (0);
}
@ -94,7 +95,17 @@ public class BaseAPIActionUtil
*******************************************************************************/
public int getMaxAllowedRateLimitErrors()
{
return 0;
return (0);
}
/*******************************************************************************
**
*******************************************************************************/
public Integer getApiStandardLimit()
{
return (20);
}
@ -272,7 +283,7 @@ public class BaseAPIActionUtil
/*******************************************************************************
**
*******************************************************************************/
protected List<QRecord> processGetResponse(QTableMetaData table, HttpResponse response) throws IOException
protected int processGetResponse(QTableMetaData table, HttpResponse response, QueryOutput queryOutput) throws IOException
{
int statusCode = response.getStatusLine().getStatusCode();
System.out.println(statusCode);
@ -285,7 +296,7 @@ public class BaseAPIActionUtil
HttpEntity entity = response.getEntity();
String resultString = EntityUtils.toString(entity);
List<QRecord> recordList = new ArrayList<>();
int count = 0;
if(StringUtils.hasContent(resultString) && !resultString.equals("null"))
{
JSONArray resultList = null;
@ -309,16 +320,18 @@ public class BaseAPIActionUtil
{
for(int i = 0; i < resultList.length(); i++)
{
recordList.add(jsonObjectToRecord(resultList.getJSONObject(i), table.getFields()));
queryOutput.addRecord(jsonObjectToRecord(resultList.getJSONObject(i), table.getFields()));
count++;
}
}
else
{
recordList.add(jsonObjectToRecord(jsonObject, table.getFields()));
queryOutput.addRecord(jsonObjectToRecord(jsonObject, table.getFields()));
count++;
}
}
return (recordList);
return (count);
}
@ -483,8 +496,14 @@ public class BaseAPIActionUtil
*******************************************************************************/
public Integer processGetResponseForCount(QTableMetaData table, HttpResponse response) throws IOException
{
List<QRecord> queryResults = processGetResponse(table, response);
return (queryResults.size());
/////////////////////////////////////////////////////////////////////////////////////////
// set up a query output with a blank query input - e.g., one that isn't using a pipe. //
/////////////////////////////////////////////////////////////////////////////////////////
QueryOutput queryOutput = new QueryOutput(new QueryInput());
processGetResponse(table, response, queryOutput);
List<QRecord> records = queryOutput.getRecords();
return (records == null ? null : records.size());
}
}