From 625ed5209cb3e7633e56327a1b0a618b03e6d0e0 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Mon, 5 May 2025 10:59:12 -0500 Subject: [PATCH 1/2] switch InMemoryStateProvider to use synchronizedMap, to avoid ConcurrentModificationException in clean method --- .../core/state/InMemoryStateProvider.java | 3 +- .../core/state/InMemoryStateProviderTest.java | 40 ++++++++++++++++--- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/state/InMemoryStateProvider.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/state/InMemoryStateProvider.java index 84ddb346..91d615eb 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/state/InMemoryStateProvider.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/state/InMemoryStateProvider.java @@ -25,6 +25,7 @@ package com.kingsrook.qqq.backend.core.state; import java.io.Serializable; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -58,7 +59,7 @@ public class InMemoryStateProvider implements StateProviderInterface *******************************************************************************/ private InMemoryStateProvider() { - this.map = new HashMap<>(); + this.map = Collections.synchronizedMap(new HashMap<>()); /////////////////////////////////////////////////////////// // Start a single thread executor to handle the cleaning // diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/state/InMemoryStateProviderTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/state/InMemoryStateProviderTest.java index b8492fe8..fb87febf 100644 --- a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/state/InMemoryStateProviderTest.java +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/state/InMemoryStateProviderTest.java @@ -25,6 +25,8 @@ package com.kingsrook.qqq.backend.core.state; import java.time.Instant; import java.time.temporal.ChronoUnit; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import com.kingsrook.qqq.backend.core.BaseTest; import com.kingsrook.qqq.backend.core.model.data.QRecord; import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData; @@ -103,17 +105,17 @@ public class InMemoryStateProviderTest extends BaseTest ///////////////////////////////////////////////////////////// // Add an entry that is 3 hours old, should not be cleaned // ///////////////////////////////////////////////////////////// - UUIDAndTypeStateKey newKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(3, ChronoUnit.HOURS)); - String newUUID = UUID.randomUUID().toString(); - QRecord newQRecord = new QRecord().withValue("uuid", newUUID); + UUIDAndTypeStateKey newKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(3, ChronoUnit.HOURS)); + String newUUID = UUID.randomUUID().toString(); + QRecord newQRecord = new QRecord().withValue("uuid", newUUID); stateProvider.put(newKey, newQRecord); //////////////////////////////////////////////////////////// // Add an entry that is 5 hours old, it should be cleaned // //////////////////////////////////////////////////////////// - UUIDAndTypeStateKey oldKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(5, ChronoUnit.HOURS)); - String oldUUID = UUID.randomUUID().toString(); - QRecord oldQRecord = new QRecord().withValue("uuid", oldUUID); + UUIDAndTypeStateKey oldKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(5, ChronoUnit.HOURS)); + String oldUUID = UUID.randomUUID().toString(); + QRecord oldQRecord = new QRecord().withValue("uuid", oldUUID); stateProvider.put(oldKey, oldQRecord); /////////////////// @@ -125,7 +127,33 @@ public class InMemoryStateProviderTest extends BaseTest Assertions.assertEquals(newUUID, qRecordFromState.getValueString("uuid"), "Should read value from state persistence"); Assertions.assertTrue(stateProvider.get(QRecord.class, oldKey).isEmpty(), "Key not found in state should return empty"); + } + + + /******************************************************************************* + ** originally written with N=100000, but showed the error as small as 1000. + *******************************************************************************/ + @Test + void testDemonstrateConcurrentModificationIfNonSynchronizedMap() + { + int N = 1000; + InMemoryStateProvider stateProvider = InMemoryStateProvider.getInstance(); + + ExecutorService executorService = Executors.newFixedThreadPool(10); + executorService.submit(() -> + { + for(int i = 0; i < N; i++) + { + UUIDAndTypeStateKey oldKey = new UUIDAndTypeStateKey(UUID.randomUUID(), StateType.PROCESS_STATUS, Instant.now().minus(5, ChronoUnit.HOURS)); + stateProvider.put(oldKey, UUID.randomUUID()); + } + }); + + for(int i = 0; i < N; i++) + { + stateProvider.clean(Instant.now().minus(4, ChronoUnit.HOURS)); + } } } \ No newline at end of file From ce2ca3f413b0422c102305e8268a4435644e1e65 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Mon, 5 May 2025 14:11:04 -0500 Subject: [PATCH 2/2] Option to useSynchronizedCollections in RecordLookupHelper --- .../processes/utils/RecordLookupHelper.java | 45 +++++++++++--- .../utils/RecordLookupHelperTest.java | 58 +++++++++++++++++++ 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelper.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelper.java index 94ca1986..899fd468 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelper.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelper.java @@ -23,6 +23,7 @@ package com.kingsrook.qqq.backend.core.processes.utils; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -51,13 +52,14 @@ import com.kingsrook.qqq.backend.core.utils.ValueUtils; *******************************************************************************/ public class RecordLookupHelper { - private Map> recordMaps = new HashMap<>(); + private Map> recordMaps; - private Map, QRecord>> uniqueKeyMaps = new HashMap<>(); + private Map, QRecord>> uniqueKeyMaps; - private Set preloadedKeys = new HashSet<>(); + private Set preloadedKeys; - private Set> disallowedOneOffLookups = new HashSet<>(); + private Set> disallowedOneOffLookups; + private boolean useSynchronizedCollections; @@ -67,6 +69,33 @@ public class RecordLookupHelper *******************************************************************************/ public RecordLookupHelper() { + this(false); + } + + + + /******************************************************************************* + ** Constructor + ** + *******************************************************************************/ + public RecordLookupHelper(boolean useSynchronizedCollections) + { + this.useSynchronizedCollections = useSynchronizedCollections; + + if(useSynchronizedCollections) + { + recordMaps = Collections.synchronizedMap(new HashMap<>()); + uniqueKeyMaps = Collections.synchronizedMap(new HashMap<>()); + preloadedKeys = Collections.synchronizedSet(new HashSet<>()); + disallowedOneOffLookups = Collections.synchronizedSet(new HashSet<>()); + } + else + { + recordMaps = new HashMap<>(); + uniqueKeyMaps = new HashMap<>(); + preloadedKeys = new HashSet<>(); + disallowedOneOffLookups = new HashSet<>(); + } } @@ -77,7 +106,7 @@ public class RecordLookupHelper public QRecord getRecordByUniqueKey(String tableName, Map uniqueKey) throws QException { String mapKey = tableName + "." + uniqueKey.keySet().stream().sorted().collect(Collectors.joining(",")); - Map, QRecord> recordMap = uniqueKeyMaps.computeIfAbsent(mapKey, (k) -> new HashMap<>()); + Map, QRecord> recordMap = uniqueKeyMaps.computeIfAbsent(mapKey, (k) -> useSynchronizedCollections ? Collections.synchronizedMap(new HashMap<>()) : new HashMap<>()); if(!recordMap.containsKey(uniqueKey)) { @@ -96,7 +125,7 @@ public class RecordLookupHelper public QRecord getRecordByKey(String tableName, String keyFieldName, Serializable key) throws QException { String mapKey = tableName + "." + keyFieldName; - Map recordMap = recordMaps.computeIfAbsent(mapKey, (k) -> new HashMap<>()); + Map recordMap = recordMaps.computeIfAbsent(mapKey, (k) -> useSynchronizedCollections ? Collections.synchronizedMap(new HashMap<>()) : new HashMap<>()); //////////////////////////////////////////////////////////// // make sure we have they key object in the expected type // @@ -150,7 +179,7 @@ public class RecordLookupHelper public void preloadRecords(String tableName, String keyFieldName, QQueryFilter filter) throws QException { String mapKey = tableName + "." + keyFieldName; - Map tableMap = recordMaps.computeIfAbsent(mapKey, s -> new HashMap<>()); + Map tableMap = recordMaps.computeIfAbsent(mapKey, s -> useSynchronizedCollections ? Collections.synchronizedMap(new HashMap<>()) : new HashMap<>()); tableMap.putAll(GeneralProcessUtils.loadTableToMap(tableName, keyFieldName, filter)); } @@ -170,7 +199,7 @@ public class RecordLookupHelper } String mapKey = tableName + "." + keyFieldName; - Map tableMap = recordMaps.computeIfAbsent(mapKey, s -> new HashMap<>()); + Map tableMap = recordMaps.computeIfAbsent(mapKey, s -> useSynchronizedCollections ? Collections.synchronizedMap(new HashMap<>()) : new HashMap<>()); QQueryFilter filter = new QQueryFilter(new QFilterCriteria(keyFieldName, QCriteriaOperator.IN, inList)); tableMap.putAll(GeneralProcessUtils.loadTableToMap(tableName, keyFieldName, filter)); diff --git a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelperTest.java b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelperTest.java index 4e2f3b30..35e8cc21 100644 --- a/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelperTest.java +++ b/qqq-backend-core/src/test/java/com/kingsrook/qqq/backend/core/processes/utils/RecordLookupHelperTest.java @@ -22,8 +22,15 @@ package com.kingsrook.qqq.backend.core.processes.utils; +import java.util.ArrayList; +import java.util.ConcurrentModificationException; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import com.kingsrook.qqq.backend.core.BaseTest; +import com.kingsrook.qqq.backend.core.context.CapturedContext; import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.model.metadata.QInstance; @@ -32,6 +39,7 @@ import com.kingsrook.qqq.backend.core.utils.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Fail.fail; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; @@ -195,4 +203,54 @@ class RecordLookupHelperTest extends BaseTest assertEquals(1, MemoryRecordStore.getStatistics().get(MemoryRecordStore.STAT_QUERIES_RAN)); } + + + /******************************************************************************* + ** run a lot of threads (eg, 100), each trying to do lots of work in a + ** shared recordLookupHelper. w/o the flag to use sync'ed collections, this + ** (usually?) fails with a ConcurrentModificationException - but with the sync'ed + ** collections, is safe. + *******************************************************************************/ + @Test + void testConcurrentModification() throws InterruptedException, ExecutionException + { + ExecutorService executorService = Executors.newFixedThreadPool(100); + RecordLookupHelper recordLookupHelper = new RecordLookupHelper(true); + + CapturedContext capture = QContext.capture(); + + List> futures = new ArrayList<>(); + for(int i = 0; i < 100; i++) + { + int finalI = i; + Future future = executorService.submit(() -> + { + QContext.init(capture); + for(int j = 0; j < 25000; j++) + { + try + { + recordLookupHelper.getRecordByKey(String.valueOf(j), "id", j); + } + catch(ConcurrentModificationException cme) + { + fail("CME!", cme); + } + catch(Exception e) + { + ////////////// + // expected // + ////////////// + } + } + }); + futures.add(future); + } + + for(Future future : futures) + { + future.get(); + } + } + } \ No newline at end of file