From cfab10c8e89f491cb5271734ecd6caed15c291b7 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Thu, 28 Dec 2023 15:54:44 -0600 Subject: [PATCH 1/2] Add option to move timestamps, e.g., to make overlapping windows --- .../actions/processes/RunProcessAction.java | 4 +- .../basepull/BasepullConfiguration.java | 65 +++++++++++++++++++ .../basepull/ExtractViaBasepullQueryStep.java | 28 +++++++- 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/RunProcessAction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/RunProcessAction.java index 0fee0fcc..c9350e68 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/RunProcessAction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/processes/RunProcessAction.java @@ -72,7 +72,7 @@ import org.apache.commons.lang.BooleanUtils; /******************************************************************************* - ** Action handler for running q-processes (which are a sequence of q-functions). + ** Action handler for running q-processes (which are a sequence of q-steps). * *******************************************************************************/ public class RunProcessAction @@ -82,6 +82,7 @@ public class RunProcessAction public static final String BASEPULL_THIS_RUNTIME_KEY = "basepullThisRuntimeKey"; public static final String BASEPULL_LAST_RUNTIME_KEY = "basepullLastRuntimeKey"; public static final String BASEPULL_TIMESTAMP_FIELD = "basepullTimestampField"; + public static final String BASEPULL_CONFIGURATION = "basepullConfiguration"; //////////////////////////////////////////////////////////////////////////////////////////////// // indicator that the timestamp field should be updated - e.g., the execute step is finished. // @@ -633,5 +634,6 @@ public class RunProcessAction runProcessInput.getValues().put(BASEPULL_LAST_RUNTIME_KEY, lastRunTime); runProcessInput.getValues().put(BASEPULL_TIMESTAMP_FIELD, basepullConfiguration.getTimestampField()); + runProcessInput.getValues().put(BASEPULL_CONFIGURATION, basepullConfiguration); } } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/BasepullConfiguration.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/BasepullConfiguration.java index e207258d..bac46b28 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/BasepullConfiguration.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/BasepullConfiguration.java @@ -40,6 +40,9 @@ public class BasepullConfiguration implements Serializable private String timestampField; // the name of the field in the table being queried against the last-run timestamp. + private Integer secondsToSubtractFromLastRunTimeForTimestampQuery; // option to adjust the query's start-time (based on last run time) by a number of seconds. + private Integer secondsToSubtractFromThisRunTimeForTimestampQuery; // option to adjust the query's end-time (based on this run time) by a number of seconds. + /******************************************************************************* @@ -244,4 +247,66 @@ public class BasepullConfiguration implements Serializable return (this); } + + + /******************************************************************************* + ** Getter for secondsToSubtractFromLastRunTimeForTimestampQuery + *******************************************************************************/ + public Integer getSecondsToSubtractFromLastRunTimeForTimestampQuery() + { + return (this.secondsToSubtractFromLastRunTimeForTimestampQuery); + } + + + + /******************************************************************************* + ** Setter for secondsToSubtractFromLastRunTimeForTimestampQuery + *******************************************************************************/ + public void setSecondsToSubtractFromLastRunTimeForTimestampQuery(Integer secondsToSubtractFromLastRunTimeForTimestampQuery) + { + this.secondsToSubtractFromLastRunTimeForTimestampQuery = secondsToSubtractFromLastRunTimeForTimestampQuery; + } + + + + /******************************************************************************* + ** Fluent setter for secondsToSubtractFromLastRunTimeForTimestampQuery + *******************************************************************************/ + public BasepullConfiguration withSecondsToSubtractFromLastRunTimeForTimestampQuery(Integer secondsToSubtractFromLastRunTimeForTimestampQuery) + { + this.secondsToSubtractFromLastRunTimeForTimestampQuery = secondsToSubtractFromLastRunTimeForTimestampQuery; + return (this); + } + + + + /******************************************************************************* + ** Getter for secondsToSubtractFromThisRunTimeForTimestampQuery + *******************************************************************************/ + public Integer getSecondsToSubtractFromThisRunTimeForTimestampQuery() + { + return (this.secondsToSubtractFromThisRunTimeForTimestampQuery); + } + + + + /******************************************************************************* + ** Setter for secondsToSubtractFromThisRunTimeForTimestampQuery + *******************************************************************************/ + public void setSecondsToSubtractFromThisRunTimeForTimestampQuery(Integer secondsToSubtractFromThisRunTimeForTimestampQuery) + { + this.secondsToSubtractFromThisRunTimeForTimestampQuery = secondsToSubtractFromThisRunTimeForTimestampQuery; + } + + + + /******************************************************************************* + ** Fluent setter for secondsToSubtractFromThisRunTimeForTimestampQuery + *******************************************************************************/ + public BasepullConfiguration withSecondsToSubtractFromThisRunTimeForTimestampQuery(Integer secondsToSubtractFromThisRunTimeForTimestampQuery) + { + this.secondsToSubtractFromThisRunTimeForTimestampQuery = secondsToSubtractFromThisRunTimeForTimestampQuery; + return (this); + } + } diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/ExtractViaBasepullQueryStep.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/ExtractViaBasepullQueryStep.java index 266df3fe..1bc357ea 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/ExtractViaBasepullQueryStep.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/ExtractViaBasepullQueryStep.java @@ -22,6 +22,8 @@ package com.kingsrook.qqq.backend.core.processes.implementations.basepull; +import java.io.Serializable; +import java.time.Instant; import java.util.List; import com.kingsrook.qqq.backend.core.actions.processes.RunProcessAction; import com.kingsrook.qqq.backend.core.exceptions.QException; @@ -122,7 +124,21 @@ public class ExtractViaBasepullQueryStep extends ExtractViaQueryStep *******************************************************************************/ protected String getLastRunTimeString(RunBackendStepInput runBackendStepInput) throws QException { - return (runBackendStepInput.getBasepullLastRunTime().toString()); + Instant lastRunTime = runBackendStepInput.getBasepullLastRunTime(); + + ////////////////////////////////////////////////////////////////////////////////////////////// + // allow the timestamps to be adjusted by the specified number of seconds. // + // normally this would be a positive value, to move to an earlier time - but it could also // + // be a negative value, if you wanted (for some reason) to move forward in time // + // this is useful to provide overlapping windows of time, in case records are being missed. // + ////////////////////////////////////////////////////////////////////////////////////////////// + Serializable basepullConfigurationValue = runBackendStepInput.getValue(RunProcessAction.BASEPULL_CONFIGURATION); + if(basepullConfigurationValue instanceof BasepullConfiguration basepullConfiguration && basepullConfiguration.getSecondsToSubtractFromLastRunTimeForTimestampQuery() != null) + { + lastRunTime = lastRunTime.minusSeconds(basepullConfiguration.getSecondsToSubtractFromLastRunTimeForTimestampQuery()); + } + + return (lastRunTime.toString()); } @@ -132,6 +148,14 @@ public class ExtractViaBasepullQueryStep extends ExtractViaQueryStep *******************************************************************************/ protected String getThisRunTimeString(RunBackendStepInput runBackendStepInput) throws QException { - return (runBackendStepInput.getValueInstant(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY).toString()); + Instant thisRunTime = runBackendStepInput.getValueInstant(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY); + + Serializable basepullConfigurationValue = runBackendStepInput.getValue(RunProcessAction.BASEPULL_CONFIGURATION); + if(basepullConfigurationValue instanceof BasepullConfiguration basepullConfiguration && basepullConfiguration.getSecondsToSubtractFromThisRunTimeForTimestampQuery() != null) + { + thisRunTime = thisRunTime.minusSeconds(basepullConfiguration.getSecondsToSubtractFromThisRunTimeForTimestampQuery()); + } + + return (thisRunTime.toString()); } } From 01c78534ef505320283514f25eb4cf93980af8f6 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Thu, 28 Dec 2023 16:20:38 -0600 Subject: [PATCH 2/2] Add test for previous commit (Add option to move timestamps, e.g., to make overlapping windows) --- .../ExtractViaBasepullQueryStepTest.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/ExtractViaBasepullQueryStepTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/ExtractViaBasepullQueryStepTest.java index 690b290e..90f2a7a6 100644 --- a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/ExtractViaBasepullQueryStepTest.java +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/implementations/basepull/ExtractViaBasepullQueryStepTest.java @@ -97,4 +97,87 @@ class ExtractViaBasepullQueryStepTest extends BaseTest .withValues(Map.of("queryFilterJson", "{}")))); } + + + /******************************************************************************* + ** + *******************************************************************************/ + @Test + void testSubtractingSeconds() throws QException + { + String originalLastRunTime = "2023-12-28T15:00:00Z"; + String lastRunTimeMinusOneMinute = "2023-12-28T14:59:00Z"; + + String originalThisRunTime = "2023-12-28T15:05:00Z"; + String thisRunTimePlusFiveSeconds = "2023-12-28T15:05:05Z"; + + /////////////////////////// + // cases for lastRunTime // + /////////////////////////// + { + /////////////////////////////////////////////////////////////////////////////// + // confirm we don't fail (and don't subtract) if config is absent from input // + /////////////////////////////////////////////////////////////////////////////// + RunBackendStepInput input = new RunBackendStepInput(); + input.setBasepullLastRunTime(Instant.parse(originalLastRunTime)); + String lastRunTimeString = new ExtractViaBasepullQueryStep().getLastRunTimeString(input); + assertEquals(originalLastRunTime, lastRunTimeString); + } + { + ////////////////////////////////////////////////////////////////////////////////// + // confirm we don't fail or subtract if secondsToSubtract isn't given in config // + ////////////////////////////////////////////////////////////////////////////////// + RunBackendStepInput input = new RunBackendStepInput(); + input.setBasepullLastRunTime(Instant.parse(originalLastRunTime)); + input.addValue(RunProcessAction.BASEPULL_CONFIGURATION, new BasepullConfiguration()); + String lastRunTimeString = new ExtractViaBasepullQueryStep().getLastRunTimeString(input); + assertEquals(originalLastRunTime, lastRunTimeString); + } + { + /////////////////////////////////////////////////////////////////////// + // confirm we do subtract if a subtract value is given in the config // + /////////////////////////////////////////////////////////////////////// + RunBackendStepInput input = new RunBackendStepInput(); + input.setBasepullLastRunTime(Instant.parse(originalLastRunTime)); + input.addValue(RunProcessAction.BASEPULL_CONFIGURATION, new BasepullConfiguration() + .withSecondsToSubtractFromLastRunTimeForTimestampQuery(60)); + String lastRunTimeString = new ExtractViaBasepullQueryStep().getLastRunTimeString(input); + assertEquals(lastRunTimeMinusOneMinute, lastRunTimeString); + } + + /////////////////////////// + // cases for thisRunTime // + /////////////////////////// + { + /////////////////////////////////////////////////////////////////////////////// + // confirm we don't fail (and don't subtract) if config is absent from input // + /////////////////////////////////////////////////////////////////////////////// + RunBackendStepInput input = new RunBackendStepInput(); + input.addValue(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY, originalThisRunTime); + String thisRunTimeString = new ExtractViaBasepullQueryStep().getThisRunTimeString(input); + assertEquals(originalThisRunTime, thisRunTimeString); + } + { + ////////////////////////////////////////////////////////////////////////////////// + // confirm we don't fail or subtract if secondsToSubtract isn't given in config // + ////////////////////////////////////////////////////////////////////////////////// + RunBackendStepInput input = new RunBackendStepInput(); + input.addValue(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY, originalThisRunTime); + input.addValue(RunProcessAction.BASEPULL_CONFIGURATION, new BasepullConfiguration()); + String thisRunTimeString = new ExtractViaBasepullQueryStep().getThisRunTimeString(input); + assertEquals(originalThisRunTime, thisRunTimeString); + } + { + /////////////////////////////////////////////////////////////////////// + // confirm we do subtract if a subtract value is given in the config // + /////////////////////////////////////////////////////////////////////// + RunBackendStepInput input = new RunBackendStepInput(); + input.addValue(RunProcessAction.BASEPULL_THIS_RUNTIME_KEY, originalThisRunTime); + input.addValue(RunProcessAction.BASEPULL_CONFIGURATION, new BasepullConfiguration() + .withSecondsToSubtractFromThisRunTimeForTimestampQuery(-5)); + String thisRunTimeString = new ExtractViaBasepullQueryStep().getThisRunTimeString(input); + assertEquals(thisRunTimePlusFiveSeconds, thisRunTimeString); + } + } + }