From c2bdcb94657e2e149678cd78720c77ecb97701b6 Mon Sep 17 00:00:00 2001 From: Darin Kelkhoff Date: Wed, 15 Nov 2023 08:35:30 -0600 Subject: [PATCH] Update to use static ThreadPoolExecutors --- .../actions/AbstractQActionBiConsumer.java | 3 +- .../core/actions/AbstractQActionFunction.java | 3 +- .../backend/core/actions/ActionHelper.java | 29 +++++++++++++++++++ .../core/actions/async/AsyncJobManager.java | 20 ++++++++++++- 4 files changed, 50 insertions(+), 5 deletions(-) diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/AbstractQActionBiConsumer.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/AbstractQActionBiConsumer.java index c22d3dac..fa43d2f1 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/AbstractQActionBiConsumer.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/AbstractQActionBiConsumer.java @@ -23,7 +23,6 @@ package com.kingsrook.qqq.backend.core.actions; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.kingsrook.qqq.backend.core.context.CapturedContext; import com.kingsrook.qqq.backend.core.context.QContext; @@ -54,7 +53,7 @@ public abstract class AbstractQActionBiConsumer completableFuture = new CompletableFuture<>(); - Executors.newCachedThreadPool().submit(() -> + ActionHelper.getExecutorService().submit(() -> { try { diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/AbstractQActionFunction.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/AbstractQActionFunction.java index 75388690..dea3e082 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/AbstractQActionFunction.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/AbstractQActionFunction.java @@ -23,7 +23,6 @@ package com.kingsrook.qqq.backend.core.actions; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.kingsrook.qqq.backend.core.context.CapturedContext; import com.kingsrook.qqq.backend.core.context.QContext; @@ -54,7 +53,7 @@ public abstract class AbstractQActionFunction completableFuture = new CompletableFuture<>(); - Executors.newCachedThreadPool().submit(() -> + ActionHelper.getExecutorService().submit(() -> { try { diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/ActionHelper.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/ActionHelper.java index 1e1ce8f0..006b6ac4 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/ActionHelper.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/ActionHelper.java @@ -24,6 +24,10 @@ package com.kingsrook.qqq.backend.core.actions; import java.io.Serializable; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.exceptions.QAuthenticationException; @@ -40,6 +44,20 @@ import com.kingsrook.qqq.backend.core.modules.authentication.QAuthenticationModu *******************************************************************************/ public class ActionHelper { + ///////////////////////////////////////////////////////////////////////////// + // we would probably use Executors.newCachedThreadPool() - but - it has no // + // maxPoolSize... we think some limit is good, so that at a large number // + // of attempted concurrent jobs we'll have new jobs block, rather than // + // exhausting all server resources and locking up "everything" // + // also, it seems like keeping a handful of core-threads around is very // + // little actual waste, and better than ever wasting time starting a new // + // one, which we know we'll often be doing. // + ///////////////////////////////////////////////////////////////////////////// + private static Integer CORE_THREADS = 8; + private static Integer MAX_THREADS = 500; + private static ExecutorService executorService = new ThreadPoolExecutor(CORE_THREADS, MAX_THREADS, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + + /******************************************************************************* ** @@ -69,6 +87,17 @@ public class ActionHelper + /******************************************************************************* + ** access an executor service for sharing among the executeAsync methods of all + ** actions. + *******************************************************************************/ + static ExecutorService getExecutorService() + { + return (executorService); + } + + + /******************************************************************************* ** *******************************************************************************/ diff --git a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobManager.java b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobManager.java index 14b9fad6..e4871048 100644 --- a/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobManager.java +++ b/qqq-backend-core/src/main/java/com/kingsrook/qqq/backend/core/actions/async/AsyncJobManager.java @@ -28,6 +28,9 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.kingsrook.qqq.backend.core.context.CapturedContext; @@ -51,9 +54,24 @@ public class AsyncJobManager { private static final QLogger LOG = QLogger.getLogger(AsyncJobManager.class); + ///////////////////////////////////////////////////////////////////////////// + // we would probably use Executors.newCachedThreadPool() - but - it has no // + // maxPoolSize... we think some limit is good, so that at a large number // + // of attempted concurrent jobs we'll have new jobs block, rather than // + // exhausting all server resources and locking up "everything" // + // also, it seems like keeping a handful of core-threads around is very // + // little actual waste, and better than ever wasting time starting a new // + // one, which we know we'll often be doing. // + ///////////////////////////////////////////////////////////////////////////// + private static Integer CORE_THREADS = 8; + private static Integer MAX_THREADS = 500; + private static ExecutorService executorService = new ThreadPoolExecutor(CORE_THREADS, MAX_THREADS, 60L, TimeUnit.SECONDS, new SynchronousQueue<>()); + + private String forcedJobUUID = null; + /******************************************************************************* ** Start a job - if it finishes within the specified timeout, get its results, ** else, get back an exception with the job id. @@ -84,7 +102,7 @@ public class AsyncJobManager { QContext.init(capturedContext); return (runAsyncJob(jobName, asyncJob, uuidAndTypeStateKey, asyncJobStatus)); - }); + }, executorService); if(timeout == 0) {