mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
Add option to move timestamps, e.g., to make overlapping windows
This commit is contained in:
@ -72,7 +72,7 @@ import org.apache.commons.lang.BooleanUtils;
|
|||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Action handler for running q-processes (which are a sequence of q-functions).
|
** Action handler for running q-processes (which are a sequence of q-steps).
|
||||||
*
|
*
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public class RunProcessAction
|
public class RunProcessAction
|
||||||
@ -82,6 +82,7 @@ public class RunProcessAction
|
|||||||
public static final String BASEPULL_THIS_RUNTIME_KEY = "basepullThisRuntimeKey";
|
public static final String BASEPULL_THIS_RUNTIME_KEY = "basepullThisRuntimeKey";
|
||||||
public static final String BASEPULL_LAST_RUNTIME_KEY = "basepullLastRuntimeKey";
|
public static final String BASEPULL_LAST_RUNTIME_KEY = "basepullLastRuntimeKey";
|
||||||
public static final String BASEPULL_TIMESTAMP_FIELD = "basepullTimestampField";
|
public static final String BASEPULL_TIMESTAMP_FIELD = "basepullTimestampField";
|
||||||
|
public static final String BASEPULL_CONFIGURATION = "basepullConfiguration";
|
||||||
|
|
||||||
////////////////////////////////////////////////////////////////////////////////////////////////
|
////////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
// indicator that the timestamp field should be updated - e.g., the execute step is finished. //
|
// indicator that the timestamp field should be updated - e.g., the execute step is finished. //
|
||||||
@ -633,5 +634,6 @@ public class RunProcessAction
|
|||||||
|
|
||||||
runProcessInput.getValues().put(BASEPULL_LAST_RUNTIME_KEY, lastRunTime);
|
runProcessInput.getValues().put(BASEPULL_LAST_RUNTIME_KEY, lastRunTime);
|
||||||
runProcessInput.getValues().put(BASEPULL_TIMESTAMP_FIELD, basepullConfiguration.getTimestampField());
|
runProcessInput.getValues().put(BASEPULL_TIMESTAMP_FIELD, basepullConfiguration.getTimestampField());
|
||||||
|
runProcessInput.getValues().put(BASEPULL_CONFIGURATION, basepullConfiguration);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,6 +40,9 @@ public class BasepullConfiguration implements Serializable
|
|||||||
|
|
||||||
private String timestampField; // the name of the field in the table being queried against the last-run timestamp.
|
private String timestampField; // the name of the field in the table being queried against the last-run timestamp.
|
||||||
|
|
||||||
|
private Integer secondsToSubtractFromLastRunTimeForTimestampQuery; // option to adjust the query's start-time (based on last run time) by a number of seconds.
|
||||||
|
private Integer secondsToSubtractFromThisRunTimeForTimestampQuery; // option to adjust the query's end-time (based on this run time) by a number of seconds.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -244,4 +247,66 @@ public class BasepullConfiguration implements Serializable
|
|||||||
return (this);
|
return (this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for secondsToSubtractFromLastRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public Integer getSecondsToSubtractFromLastRunTimeForTimestampQuery()
|
||||||
|
{
|
||||||
|
return (this.secondsToSubtractFromLastRunTimeForTimestampQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Setter for secondsToSubtractFromLastRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public void setSecondsToSubtractFromLastRunTimeForTimestampQuery(Integer secondsToSubtractFromLastRunTimeForTimestampQuery)
|
||||||
|
{
|
||||||
|
this.secondsToSubtractFromLastRunTimeForTimestampQuery = secondsToSubtractFromLastRunTimeForTimestampQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Fluent setter for secondsToSubtractFromLastRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public BasepullConfiguration withSecondsToSubtractFromLastRunTimeForTimestampQuery(Integer secondsToSubtractFromLastRunTimeForTimestampQuery)
|
||||||
|
{
|
||||||
|
this.secondsToSubtractFromLastRunTimeForTimestampQuery = secondsToSubtractFromLastRunTimeForTimestampQuery;
|
||||||
|
return (this);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for secondsToSubtractFromThisRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public Integer getSecondsToSubtractFromThisRunTimeForTimestampQuery()
|
||||||
|
{
|
||||||
|
return (this.secondsToSubtractFromThisRunTimeForTimestampQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Setter for secondsToSubtractFromThisRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public void setSecondsToSubtractFromThisRunTimeForTimestampQuery(Integer secondsToSubtractFromThisRunTimeForTimestampQuery)
|
||||||
|
{
|
||||||
|
this.secondsToSubtractFromThisRunTimeForTimestampQuery = secondsToSubtractFromThisRunTimeForTimestampQuery;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Fluent setter for secondsToSubtractFromThisRunTimeForTimestampQuery
|
||||||
|
*******************************************************************************/
|
||||||
|
public BasepullConfiguration withSecondsToSubtractFromThisRunTimeForTimestampQuery(Integer secondsToSubtractFromThisRunTimeForTimestampQuery)
|
||||||
|
{
|
||||||
|
this.secondsToSubtractFromThisRunTimeForTimestampQuery = secondsToSubtractFromThisRunTimeForTimestampQuery;
|
||||||
|
return (this);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
package com.kingsrook.qqq.backend.core.processes.implementations.basepull;
|
package com.kingsrook.qqq.backend.core.processes.implementations.basepull;
|
||||||
|
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
|
import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction;
|
||||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||||
@ -122,7 +124,21 @@ public class ExtractViaBasepullQueryStep extends ExtractViaQueryStep
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
protected String getLastRunTimeString(RunBackendStepInput runBackendStepInput) throws QException
|
protected String getLastRunTimeString(RunBackendStepInput runBackendStepInput) throws QException
|
||||||
{
|
{
|
||||||
return (runBackendStepInput.getBasepullLastRunTime().toString());
|
Instant lastRunTime = runBackendStepInput.getBasepullLastRunTime();
|
||||||
|
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
// allow the timestamps to be adjusted by the specified number of seconds. //
|
||||||
|
// normally this would be a positive value, to move to an earlier time - but it could also //
|
||||||
|
// be a negative value, if you wanted (for some reason) to move forward in time //
|
||||||
|
// this is useful to provide overlapping windows of time, in case records are being missed. //
|
||||||
|
//////////////////////////////////////////////////////////////////////////////////////////////
|
||||||
|
Serializable basepullConfigurationValue = runBackendStepInput.getValue(RunProcessAction.BASEPULL_CONFIGURATION);
|
||||||
|
if(basepullConfigurationValue instanceof BasepullConfiguration basepullConfiguration && basepullConfiguration.getSecondsToSubtractFromLastRunTimeForTimestampQuery() != null)
|
||||||
|
{
|
||||||
|
lastRunTime = lastRunTime.minusSeconds(basepullConfiguration.getSecondsToSubtractFromLastRunTimeForTimestampQuery());
|
||||||
|
}
|
||||||
|
|
||||||
|
return (lastRunTime.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -132,6 +148,14 @@ public class ExtractViaBasepullQueryStep extends ExtractViaQueryStep
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
protected String getThisRunTimeString(RunBackendStepInput runBackendStepInput) throws QException
|
protected String getThisRunTimeString(RunBackendStepInput runBackendStepInput) throws QException
|
||||||
{
|
{
|
||||||
return (runBackendStepInput.getValueInstant(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY).toString());
|
Instant thisRunTime = runBackendStepInput.getValueInstant(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY);
|
||||||
|
|
||||||
|
Serializable basepullConfigurationValue = runBackendStepInput.getValue(RunProcessAction.BASEPULL_CONFIGURATION);
|
||||||
|
if(basepullConfigurationValue instanceof BasepullConfiguration basepullConfiguration && basepullConfiguration.getSecondsToSubtractFromThisRunTimeForTimestampQuery() != null)
|
||||||
|
{
|
||||||
|
thisRunTime = thisRunTime.minusSeconds(basepullConfiguration.getSecondsToSubtractFromThisRunTimeForTimestampQuery());
|
||||||
|
}
|
||||||
|
|
||||||
|
return (thisRunTime.toString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user