mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
Merge pull request #81 from Kingsrook/feature/CE-978-crashing-nodes
Feature/ce 978 crashing nodes
This commit is contained in:
@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.state;
|
|||||||
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -57,4 +58,10 @@ public abstract class AbstractStateKey implements Serializable
|
|||||||
@Override
|
@Override
|
||||||
public abstract String toString();
|
public abstract String toString();
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Require all state keys to implement the getStartTime method
|
||||||
|
*
|
||||||
|
*******************************************************************************/
|
||||||
|
public abstract Instant getStartTime();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -23,9 +23,16 @@ package com.kingsrook.qqq.backend.core.state;
|
|||||||
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||||
|
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -33,10 +40,16 @@ import java.util.Optional;
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public class InMemoryStateProvider implements StateProviderInterface
|
public class InMemoryStateProvider implements StateProviderInterface
|
||||||
{
|
{
|
||||||
|
private static final QLogger LOG = QLogger.getLogger(InMemoryStateProvider.class);
|
||||||
|
|
||||||
private static InMemoryStateProvider instance;
|
private static InMemoryStateProvider instance;
|
||||||
|
|
||||||
private final Map<AbstractStateKey, Object> map;
|
private final Map<AbstractStateKey, Object> map;
|
||||||
|
|
||||||
|
private static int jobPeriodSeconds = 60 * 60; // 1 hour
|
||||||
|
private static int cleanHours = 6;
|
||||||
|
private static int jobInitialDelay = 60 * 60 * cleanHours;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
@ -45,6 +58,41 @@ public class InMemoryStateProvider implements StateProviderInterface
|
|||||||
private InMemoryStateProvider()
|
private InMemoryStateProvider()
|
||||||
{
|
{
|
||||||
this.map = new HashMap<>();
|
this.map = new HashMap<>();
|
||||||
|
|
||||||
|
///////////////////////////////////////////////////////////
|
||||||
|
// Start a single thread executor to handle the cleaning //
|
||||||
|
///////////////////////////////////////////////////////////
|
||||||
|
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
executorService.scheduleAtFixedRate(new InMemoryStateProvider.InMemoryStateProviderCleanJob(), jobInitialDelay, jobPeriodSeconds, TimeUnit.SECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Runnable that gets scheduled to periodically clean the InMemoryStateProvider
|
||||||
|
*******************************************************************************/
|
||||||
|
private static class InMemoryStateProviderCleanJob implements Runnable
|
||||||
|
{
|
||||||
|
private static final QLogger LOG = QLogger.getLogger(InMemoryStateProvider.InMemoryStateProviderCleanJob.class);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** run
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public void run()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Instant cleanTime = Instant.now().minus(cleanHours, ChronoUnit.HOURS);
|
||||||
|
getInstance().clean(cleanTime);
|
||||||
|
}
|
||||||
|
catch(Exception e)
|
||||||
|
{
|
||||||
|
LOG.warn("Error cleaning InMemoryStateProvider entries.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -101,4 +149,24 @@ public class InMemoryStateProvider implements StateProviderInterface
|
|||||||
map.remove(key);
|
map.remove(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Clean entries that started before the given Instant
|
||||||
|
*
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public void clean(Instant cleanBeforeInstant)
|
||||||
|
{
|
||||||
|
long jobStartTime = System.currentTimeMillis();
|
||||||
|
Integer beforeSize = map.size();
|
||||||
|
LOG.info("Starting clean for InMemoryStateProvider.", logPair("beforeSize", beforeSize));
|
||||||
|
|
||||||
|
map.entrySet().removeIf(e -> e.getKey().getStartTime().isBefore(cleanBeforeInstant));
|
||||||
|
|
||||||
|
Integer afterSize = map.size();
|
||||||
|
long endTime = System.currentTimeMillis();
|
||||||
|
LOG.info("Completed clean for InMemoryStateProvider.", logPair("beforeSize", beforeSize), logPair("afterSize", afterSize), logPair("amountCleaned", (beforeSize - afterSize)), logPair("runTimeMillis", (endTime - jobStartTime)));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,9 @@
|
|||||||
package com.kingsrook.qqq.backend.core.state;
|
package com.kingsrook.qqq.backend.core.state;
|
||||||
|
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
@ -93,4 +96,17 @@ public class SimpleStateKey<T> extends AbstractStateKey
|
|||||||
{
|
{
|
||||||
return key.hashCode();
|
return key.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for startTime
|
||||||
|
*******************************************************************************/
|
||||||
|
public Instant getStartTime()
|
||||||
|
{
|
||||||
|
//////////////////////////////////////////
|
||||||
|
// For now these will never get cleaned //
|
||||||
|
//////////////////////////////////////////
|
||||||
|
return (Instant.now());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.state;
|
|||||||
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
|
||||||
|
|
||||||
@ -58,4 +59,8 @@ public interface StateProviderInterface
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
void remove(AbstractStateKey key);
|
void remove(AbstractStateKey key);
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Clean entries that started before the given Instant
|
||||||
|
*******************************************************************************/
|
||||||
|
void clean(Instant startTime);
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ import java.io.File;
|
|||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||||
import com.kingsrook.qqq.backend.core.utils.JsonUtils;
|
import com.kingsrook.qqq.backend.core.utils.JsonUtils;
|
||||||
@ -126,6 +127,19 @@ public class TempFileStateProvider implements StateProviderInterface
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Clean entries that started before the given Instant
|
||||||
|
*******************************************************************************/
|
||||||
|
@Override
|
||||||
|
public void clean(Instant startTime)
|
||||||
|
{
|
||||||
|
////////////////////////////////
|
||||||
|
// Not supported at this time //
|
||||||
|
////////////////////////////////
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*******************************************************************************
|
/*******************************************************************************
|
||||||
** Get the file referenced by a key
|
** Get the file referenced by a key
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
|
@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.state;
|
|||||||
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
@ -34,6 +35,7 @@ public class UUIDAndTypeStateKey extends AbstractStateKey implements Serializabl
|
|||||||
{
|
{
|
||||||
private final UUID uuid;
|
private final UUID uuid;
|
||||||
private final StateType stateType;
|
private final StateType stateType;
|
||||||
|
private final Instant startTime;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@ -43,7 +45,7 @@ public class UUIDAndTypeStateKey extends AbstractStateKey implements Serializabl
|
|||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public UUIDAndTypeStateKey(StateType stateType)
|
public UUIDAndTypeStateKey(StateType stateType)
|
||||||
{
|
{
|
||||||
this(UUID.randomUUID(), stateType);
|
this(UUID.randomUUID(), stateType, Instant.now());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -53,9 +55,21 @@ public class UUIDAndTypeStateKey extends AbstractStateKey implements Serializabl
|
|||||||
**
|
**
|
||||||
*******************************************************************************/
|
*******************************************************************************/
|
||||||
public UUIDAndTypeStateKey(UUID uuid, StateType stateType)
|
public UUIDAndTypeStateKey(UUID uuid, StateType stateType)
|
||||||
|
{
|
||||||
|
this(uuid, stateType, Instant.now());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Constructor where user can supply the UUID.
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
public UUIDAndTypeStateKey(UUID uuid, StateType stateType, Instant startTime)
|
||||||
{
|
{
|
||||||
this.uuid = uuid;
|
this.uuid = uuid;
|
||||||
this.stateType = stateType;
|
this.stateType = stateType;
|
||||||
|
this.startTime = startTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -133,4 +147,15 @@ public class UUIDAndTypeStateKey extends AbstractStateKey implements Serializabl
|
|||||||
{
|
{
|
||||||
return "{uuid=" + uuid + ", stateType=" + stateType + '}';
|
return "{uuid=" + uuid + ", stateType=" + stateType + '}';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
** Getter for startTime
|
||||||
|
*******************************************************************************/
|
||||||
|
public Instant getStartTime()
|
||||||
|
{
|
||||||
|
return (this.startTime);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -22,6 +22,8 @@
|
|||||||
package com.kingsrook.qqq.backend.core.state;
|
package com.kingsrook.qqq.backend.core.state;
|
||||||
|
|
||||||
|
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.time.temporal.ChronoUnit;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import com.kingsrook.qqq.backend.core.BaseTest;
|
import com.kingsrook.qqq.backend.core.BaseTest;
|
||||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||||
@ -88,4 +90,42 @@ public class InMemoryStateProviderTest extends BaseTest
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/*******************************************************************************
|
||||||
|
**
|
||||||
|
*******************************************************************************/
|
||||||
|
@Test
|
||||||
|
public void testClean()
|
||||||
|
{
|
||||||
|
InMemoryStateProvider stateProvider = InMemoryStateProvider.getInstance();
|
||||||
|
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
// Add an entry that is 3 hours old, should not be cleaned //
|
||||||
|
/////////////////////////////////////////////////////////////
|
||||||
|
UUIDAndTypeStateKey newKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(3, ChronoUnit.HOURS));
|
||||||
|
String newUUID = UUID.randomUUID().toString();
|
||||||
|
QRecord newQRecord = new QRecord().withValue("uuid", newUUID);
|
||||||
|
stateProvider.put(newKey, newQRecord);
|
||||||
|
|
||||||
|
////////////////////////////////////////////////////////////
|
||||||
|
// Add an entry that is 5 hours old, it should be cleaned //
|
||||||
|
////////////////////////////////////////////////////////////
|
||||||
|
UUIDAndTypeStateKey oldKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(5, ChronoUnit.HOURS));
|
||||||
|
String oldUUID = UUID.randomUUID().toString();
|
||||||
|
QRecord oldQRecord = new QRecord().withValue("uuid", oldUUID);
|
||||||
|
stateProvider.put(oldKey, oldQRecord);
|
||||||
|
|
||||||
|
///////////////////
|
||||||
|
// Call to clean //
|
||||||
|
///////////////////
|
||||||
|
stateProvider.clean(Instant.now().minus(4, ChronoUnit.HOURS));
|
||||||
|
|
||||||
|
QRecord qRecordFromState = stateProvider.get(QRecord.class, newKey).get();
|
||||||
|
Assertions.assertEquals(newUUID, qRecordFromState.getValueString("uuid"), "Should read value from state persistence");
|
||||||
|
|
||||||
|
Assertions.assertTrue(stateProvider.get(QRecord.class, oldKey).isEmpty(), "Key not found in state should return empty");
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
Reference in New Issue
Block a user