mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
Merge pull request #57 from Kingsrook/feature/basepull-subtract-seconds
Add option to move timestamps, e.g., to make overlapping windows
This commit is contained in:
@ -72,7 +72,7 @@ import org.apache.commons.lang.BooleanUtils;
|
|||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Action handler for running q-processes (which are a sequence of q-functions).
|
** Action handler for running q-processes (which are a sequence of q-steps).
|
||||||
*
|
*
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public class RunProcessAction
|
public class RunProcessAction
|
||||||
@ -82,6 +82,7 @@ public class RunProcessAction
|
|||||||
public static final String BASEPULL_THIS_RUNTIME_KEY = "basepullThisRuntimeKey";
|
public static final String BASEPULL_THIS_RUNTIME_KEY = "basepullThisRuntimeKey";
|
||||||
public static final String BASEPULL_LAST_RUNTIME_KEY = "basepullLastRuntimeKey";
|
public static final String BASEPULL_LAST_RUNTIME_KEY = "basepullLastRuntimeKey";
|
||||||
public static final String BASEPULL_TIMESTAMP_FIELD = "basepullTimestampField";
|
public static final String BASEPULL_TIMESTAMP_FIELD = "basepullTimestampField";
|
||||||
|
public static final String BASEPULL_CONFIGURATION = "basepullConfiguration";
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// indicator that the timestamp field should be updated - e.g., the execute step is finished. //
|
// indicator that the timestamp field should be updated - e.g., the execute step is finished. //
|
||||||
@ -633,5 +634,6 @@ public class RunProcessAction
|
|||||||
|
|
||||||
runProcessInput.getValues().put(BASEPULL_LAST_RUNTIME_KEY, lastRunTime);
|
runProcessInput.getValues().put(BASEPULL_LAST_RUNTIME_KEY, lastRunTime);
|
||||||
runProcessInput.getValues().put(BASEPULL_TIMESTAMP_FIELD, basepullConfiguration.getTimestampField());
|
runProcessInput.getValues().put(BASEPULL_TIMESTAMP_FIELD, basepullConfiguration.getTimestampField());
|
||||||
|
runProcessInput.getValues().put(BASEPULL_CONFIGURATION, basepullConfiguration);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,9 @@ public class BasepullConfiguration implements Serializable
|
|||||||
|
|
||||||
private String timestampField; // the name of the field in the table being queried against the last-run timestamp.
|
private String timestampField; // the name of the field in the table being queried against the last-run timestamp.
|
||||||
|
|
||||||
|
private Integer secondsToSubtractFromLastRunTimeForTimestampQuery; // option to adjust the query's start-time (based on last run time) by a number of seconds.
|
||||||
|
private Integer secondsToSubtractFromThisRunTimeForTimestampQuery; // option to adjust the query's end-time (based on this run time) by a number of seconds.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -244,4 +247,66 @@ public class BasepullConfiguration implements Serializable
|
|||||||
return (this);
|
return (this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for secondsToSubtractFromLastRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public Integer getSecondsToSubtractFromLastRunTimeForTimestampQuery()
|
||||||
|
{
|
||||||
|
return (this.secondsToSubtractFromLastRunTimeForTimestampQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Setter for secondsToSubtractFromLastRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public void setSecondsToSubtractFromLastRunTimeForTimestampQuery(Integer secondsToSubtractFromLastRunTimeForTimestampQuery)
|
||||||
|
{
|
||||||
|
this.secondsToSubtractFromLastRunTimeForTimestampQuery = secondsToSubtractFromLastRunTimeForTimestampQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Fluent setter for secondsToSubtractFromLastRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public BasepullConfiguration withSecondsToSubtractFromLastRunTimeForTimestampQuery(Integer secondsToSubtractFromLastRunTimeForTimestampQuery)
|
||||||
|
{
|
||||||
|
this.secondsToSubtractFromLastRunTimeForTimestampQuery = secondsToSubtractFromLastRunTimeForTimestampQuery;
|
||||||
|
return (this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for secondsToSubtractFromThisRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public Integer getSecondsToSubtractFromThisRunTimeForTimestampQuery()
|
||||||
|
{
|
||||||
|
return (this.secondsToSubtractFromThisRunTimeForTimestampQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Setter for secondsToSubtractFromThisRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public void setSecondsToSubtractFromThisRunTimeForTimestampQuery(Integer secondsToSubtractFromThisRunTimeForTimestampQuery)
|
||||||
|
{
|
||||||
|
this.secondsToSubtractFromThisRunTimeForTimestampQuery = secondsToSubtractFromThisRunTimeForTimestampQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Fluent setter for secondsToSubtractFromThisRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public BasepullConfiguration withSecondsToSubtractFromThisRunTimeForTimestampQuery(Integer secondsToSubtractFromThisRunTimeForTimestampQuery)
|
||||||
|
{
|
||||||
|
this.secondsToSubtractFromThisRunTimeForTimestampQuery = secondsToSubtractFromThisRunTimeForTimestampQuery;
|
||||||
|
return (this);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
package com.kingsrook.qqq.backend.core.processes.implementations.basepull;
|
package com.kingsrook.qqq.backend.core.processes.implementations.basepull;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
|
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
@ -122,7 +124,21 @@ public class ExtractViaBasepullQueryStep extends ExtractViaQueryStep
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
protected String getLastRunTimeString(RunBackendStepInput runBackendStepInput) throws QException
|
protected String getLastRunTimeString(RunBackendStepInput runBackendStepInput) throws QException
|
||||||
{
|
{
|
||||||
return (runBackendStepInput.getBasepullLastRunTime().toString());
|
Instant lastRunTime = runBackendStepInput.getBasepullLastRunTime();
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// allow the timestamps to be adjusted by the specified number of seconds. //
|
||||||
|
// normally this would be a positive value, to move to an earlier time - but it could also //
|
||||||
|
// be a negative value, if you wanted (for some reason) to move forward in time //
|
||||||
|
// this is useful to provide overlapping windows of time, in case records are being missed. //
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
Serializable basepullConfigurationValue = runBackendStepInput.getValue(RunProcessAction.BASEPULL_CONFIGURATION);
|
||||||
|
if(basepullConfigurationValue instanceof BasepullConfiguration basepullConfiguration && basepullConfiguration.getSecondsToSubtractFromLastRunTimeForTimestampQuery() != null)
|
||||||
|
{
|
||||||
|
lastRunTime = lastRunTime.minusSeconds(basepullConfiguration.getSecondsToSubtractFromLastRunTimeForTimestampQuery());
|
||||||
|
}
|
||||||
|
|
||||||
|
return (lastRunTime.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -132,6 +148,14 @@ public class ExtractViaBasepullQueryStep extends ExtractViaQueryStep
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
protected String getThisRunTimeString(RunBackendStepInput runBackendStepInput) throws QException
|
protected String getThisRunTimeString(RunBackendStepInput runBackendStepInput) throws QException
|
||||||
{
|
{
|
||||||
return (runBackendStepInput.getValueInstant(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY).toString());
|
Instant thisRunTime = runBackendStepInput.getValueInstant(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY);
|
||||||
|
|
||||||
|
Serializable basepullConfigurationValue = runBackendStepInput.getValue(RunProcessAction.BASEPULL_CONFIGURATION);
|
||||||
|
if(basepullConfigurationValue instanceof BasepullConfiguration basepullConfiguration && basepullConfiguration.getSecondsToSubtractFromThisRunTimeForTimestampQuery() != null)
|
||||||
|
{
|
||||||
|
thisRunTime = thisRunTime.minusSeconds(basepullConfiguration.getSecondsToSubtractFromThisRunTimeForTimestampQuery());
|
||||||
|
}
|
||||||
|
|
||||||
|
return (thisRunTime.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -97,4 +97,87 @@ class ExtractViaBasepullQueryStepTest extends BaseTest
|
|||||||
.withValues(Map.of("queryFilterJson", "{}"))));
|
.withValues(Map.of("queryFilterJson", "{}"))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
void testSubtractingSeconds() throws QException
|
||||||
|
{
|
||||||
|
String originalLastRunTime = "2023-12-28T15:00:00Z";
|
||||||
|
String lastRunTimeMinusOneMinute = "2023-12-28T14:59:00Z";
|
||||||
|
|
||||||
|
String originalThisRunTime = "2023-12-28T15:05:00Z";
|
||||||
|
String thisRunTimePlusFiveSeconds = "2023-12-28T15:05:05Z";
|
||||||
|
|
||||||
|
///////////////////////////
|
||||||
|
// cases for lastRunTime //
|
||||||
|
///////////////////////////
|
||||||
|
{
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// confirm we don't fail (and don't subtract) if config is absent from input //
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
RunBackendStepInput input = new RunBackendStepInput();
|
||||||
|
input.setBasepullLastRunTime(Instant.parse(originalLastRunTime));
|
||||||
|
String lastRunTimeString = new ExtractViaBasepullQueryStep().getLastRunTimeString(input);
|
||||||
|
assertEquals(originalLastRunTime, lastRunTimeString);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// confirm we don't fail or subtract if secondsToSubtract isn't given in config //
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////
|
||||||
|
RunBackendStepInput input = new RunBackendStepInput();
|
||||||
|
input.setBasepullLastRunTime(Instant.parse(originalLastRunTime));
|
||||||
|
input.addValue(RunProcessAction.BASEPULL_CONFIGURATION, new BasepullConfiguration());
|
||||||
|
String lastRunTimeString = new ExtractViaBasepullQueryStep().getLastRunTimeString(input);
|
||||||
|
assertEquals(originalLastRunTime, lastRunTimeString);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
// confirm we do subtract if a subtract value is given in the config //
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
RunBackendStepInput input = new RunBackendStepInput();
|
||||||
|
input.setBasepullLastRunTime(Instant.parse(originalLastRunTime));
|
||||||
|
input.addValue(RunProcessAction.BASEPULL_CONFIGURATION, new BasepullConfiguration()
|
||||||
|
.withSecondsToSubtractFromLastRunTimeForTimestampQuery(60));
|
||||||
|
String lastRunTimeString = new ExtractViaBasepullQueryStep().getLastRunTimeString(input);
|
||||||
|
assertEquals(lastRunTimeMinusOneMinute, lastRunTimeString);
|
||||||
|
}
|
||||||
|
|
||||||
|
///////////////////////////
|
||||||
|
// cases for thisRunTime //
|
||||||
|
///////////////////////////
|
||||||
|
{
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
// confirm we don't fail (and don't subtract) if config is absent from input //
|
||||||
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
RunBackendStepInput input = new RunBackendStepInput();
|
||||||
|
input.addValue(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY, originalThisRunTime);
|
||||||
|
String thisRunTimeString = new ExtractViaBasepullQueryStep().getThisRunTimeString(input);
|
||||||
|
assertEquals(originalThisRunTime, thisRunTimeString);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// confirm we don't fail or subtract if secondsToSubtract isn't given in config //
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////
|
||||||
|
RunBackendStepInput input = new RunBackendStepInput();
|
||||||
|
input.addValue(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY, originalThisRunTime);
|
||||||
|
input.addValue(RunProcessAction.BASEPULL_CONFIGURATION, new BasepullConfiguration());
|
||||||
|
String thisRunTimeString = new ExtractViaBasepullQueryStep().getThisRunTimeString(input);
|
||||||
|
assertEquals(originalThisRunTime, thisRunTimeString);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
// confirm we do subtract if a subtract value is given in the config //
|
||||||
|
///////////////////////////////////////////////////////////////////////
|
||||||
|
RunBackendStepInput input = new RunBackendStepInput();
|
||||||
|
input.addValue(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY, originalThisRunTime);
|
||||||
|
input.addValue(RunProcessAction.BASEPULL_CONFIGURATION, new BasepullConfiguration()
|
||||||
|
.withSecondsToSubtractFromThisRunTimeForTimestampQuery(-5));
|
||||||
|
String thisRunTimeString = new ExtractViaBasepullQueryStep().getThisRunTimeString(input);
|
||||||
|
assertEquals(thisRunTimePlusFiveSeconds, thisRunTimeString);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user