mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
CE-978 - Initial commit of a clean thread/method to our InMemoryStateProvider to reduce memory leak
This commit is contained in:
@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.state;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -57,4 +58,10 @@ public abstract class AbstractStateKey implements Serializable
|
||||
@Override
|
||||
public abstract String toString();
|
||||
|
||||
/*******************************************************************************
|
||||
** Require all state keys to implement the getStartTime method
|
||||
*
|
||||
*******************************************************************************/
|
||||
public abstract Instant getStartTime();
|
||||
|
||||
}
|
||||
|
@ -23,9 +23,16 @@ package com.kingsrook.qqq.backend.core.state;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import static com.kingsrook.qqq.backend.core.logging.LogUtils.logPair;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -33,10 +40,15 @@ import java.util.Optional;
|
||||
*******************************************************************************/
|
||||
public class InMemoryStateProvider implements StateProviderInterface
|
||||
{
|
||||
private static final QLogger LOG = QLogger.getLogger(InMemoryStateProvider.class);
|
||||
|
||||
private static InMemoryStateProvider instance;
|
||||
|
||||
private final Map<AbstractStateKey, Object> map;
|
||||
|
||||
private int jobPeriodSeconds = 60 * 15;
|
||||
private int jobInitialDelay = 60 * 60 * 4;
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
@ -45,6 +57,41 @@ public class InMemoryStateProvider implements StateProviderInterface
|
||||
private InMemoryStateProvider()
|
||||
{
|
||||
this.map = new HashMap<>();
|
||||
|
||||
///////////////////////////////////////////////////////////
|
||||
// Start a single thread executor to handle the cleaning //
|
||||
///////////////////////////////////////////////////////////
|
||||
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
|
||||
executorService.scheduleAtFixedRate(new InMemoryStateProvider.InMemoryStateProviderCleanJob(), jobInitialDelay, jobPeriodSeconds, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Runnable that gets scheduled to periodically clean the InMemoryStateProvider
|
||||
*******************************************************************************/
|
||||
private static class InMemoryStateProviderCleanJob implements Runnable
|
||||
{
|
||||
private static final QLogger LOG = QLogger.getLogger(InMemoryStateProvider.InMemoryStateProviderCleanJob.class);
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** run
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
try
|
||||
{
|
||||
Instant cleanTime = Instant.now().minus(4, ChronoUnit.HOURS);
|
||||
getInstance().clean(cleanTime);
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
LOG.warn("Error cleaning InMemoryStateProvider entries.", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -101,4 +148,24 @@ public class InMemoryStateProvider implements StateProviderInterface
|
||||
map.remove(key);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Clean entries that started before the given Instant
|
||||
*
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
public void clean(Instant cleanBeforeInstant)
|
||||
{
|
||||
long jobStartTime = System.currentTimeMillis();
|
||||
Integer beforeSize = map.size();
|
||||
LOG.info("Starting clean for InMemoryStateProvider.", logPair("beforeSize", beforeSize));
|
||||
|
||||
map.entrySet().removeIf(e -> e.getKey().getStartTime().isBefore(cleanBeforeInstant));
|
||||
|
||||
Integer afterSize = map.size();
|
||||
long endTime = System.currentTimeMillis();
|
||||
LOG.info("Completed clean for InMemoryStateProvider.", logPair("beforeSize", beforeSize), logPair("afterSize", afterSize), logPair("amountCleaned", (beforeSize - afterSize)), logPair("runTimeMillis", (endTime - jobStartTime)));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -22,6 +22,9 @@
|
||||
package com.kingsrook.qqq.backend.core.state;
|
||||
|
||||
|
||||
import java.time.Instant;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@ -93,4 +96,17 @@ public class SimpleStateKey<T> extends AbstractStateKey
|
||||
{
|
||||
return key.hashCode();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Getter for startTime
|
||||
*******************************************************************************/
|
||||
public Instant getStartTime()
|
||||
{
|
||||
//////////////////////////////////////////
|
||||
// For now these will never get cleaned //
|
||||
//////////////////////////////////////////
|
||||
return (Instant.now());
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.state;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
import java.util.Optional;
|
||||
|
||||
|
||||
@ -58,4 +59,8 @@ public interface StateProviderInterface
|
||||
*******************************************************************************/
|
||||
void remove(AbstractStateKey key);
|
||||
|
||||
/*******************************************************************************
|
||||
** Clean entries that started before the given Instant
|
||||
*******************************************************************************/
|
||||
void clean(Instant startTime);
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
import java.util.Optional;
|
||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import com.kingsrook.qqq.backend.core.utils.JsonUtils;
|
||||
@ -126,6 +127,19 @@ public class TempFileStateProvider implements StateProviderInterface
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Clean entries that started before the given Instant
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
public void clean(Instant startTime)
|
||||
{
|
||||
////////////////////////////////
|
||||
// Not supported at this time //
|
||||
////////////////////////////////
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Get the file referenced by a key
|
||||
*******************************************************************************/
|
||||
|
@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.state;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.Instant;
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
|
||||
@ -34,6 +35,7 @@ public class UUIDAndTypeStateKey extends AbstractStateKey implements Serializabl
|
||||
{
|
||||
private final UUID uuid;
|
||||
private final StateType stateType;
|
||||
private final Instant startTime;
|
||||
|
||||
|
||||
|
||||
@ -43,7 +45,7 @@ public class UUIDAndTypeStateKey extends AbstractStateKey implements Serializabl
|
||||
*******************************************************************************/
|
||||
public UUIDAndTypeStateKey(StateType stateType)
|
||||
{
|
||||
this(UUID.randomUUID(), stateType);
|
||||
this(UUID.randomUUID(), stateType, Instant.now());
|
||||
}
|
||||
|
||||
|
||||
@ -53,9 +55,21 @@ public class UUIDAndTypeStateKey extends AbstractStateKey implements Serializabl
|
||||
**
|
||||
*******************************************************************************/
|
||||
public UUIDAndTypeStateKey(UUID uuid, StateType stateType)
|
||||
{
|
||||
this(uuid, stateType, Instant.now());
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Constructor where user can supply the UUID.
|
||||
**
|
||||
*******************************************************************************/
|
||||
public UUIDAndTypeStateKey(UUID uuid, StateType stateType, Instant startTime)
|
||||
{
|
||||
this.uuid = uuid;
|
||||
this.stateType = stateType;
|
||||
this.startTime = startTime;
|
||||
}
|
||||
|
||||
|
||||
@ -133,4 +147,15 @@ public class UUIDAndTypeStateKey extends AbstractStateKey implements Serializabl
|
||||
{
|
||||
return "{uuid=" + uuid + ", stateType=" + stateType + '}';
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Getter for startTime
|
||||
*******************************************************************************/
|
||||
public Instant getStartTime()
|
||||
{
|
||||
return (this.startTime);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -22,6 +22,8 @@
|
||||
package com.kingsrook.qqq.backend.core.state;
|
||||
|
||||
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.UUID;
|
||||
import com.kingsrook.qqq.backend.core.BaseTest;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
@ -88,4 +90,42 @@ public class InMemoryStateProviderTest extends BaseTest
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Test
|
||||
public void testClean()
|
||||
{
|
||||
InMemoryStateProvider stateProvider = InMemoryStateProvider.getInstance();
|
||||
|
||||
/////////////////////////////////////////////////////////////
|
||||
// Add an entry that is 3 hours old, should not be cleaned //
|
||||
/////////////////////////////////////////////////////////////
|
||||
UUIDAndTypeStateKey newKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(3, ChronoUnit.HOURS));
|
||||
String newUUID = UUID.randomUUID().toString();
|
||||
QRecord newQRecord = new QRecord().withValue("uuid", newUUID);
|
||||
stateProvider.put(newKey, newQRecord);
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
// Add an entry that is 5 hours old, it should be cleaned //
|
||||
////////////////////////////////////////////////////////////
|
||||
UUIDAndTypeStateKey oldKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(5, ChronoUnit.HOURS));
|
||||
String oldUUID = UUID.randomUUID().toString();
|
||||
QRecord oldQRecord = new QRecord().withValue("uuid", oldUUID);
|
||||
stateProvider.put(oldKey, oldQRecord);
|
||||
|
||||
///////////////////
|
||||
// Call to clean //
|
||||
///////////////////
|
||||
stateProvider.clean(Instant.now().minus(4, ChronoUnit.HOURS));
|
||||
|
||||
QRecord qRecordFromState = stateProvider.get(QRecord.class, newKey).get();
|
||||
Assertions.assertEquals(newUUID, qRecordFromState.getValueString("uuid"), "Should read value from state persistence");
|
||||
|
||||
Assertions.assertTrue(stateProvider.get(QRecord.class, oldKey).isEmpty(), "Key not found in state should return empty");
|
||||
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user