Merge pull request #48 from Kingsrook/bugfix/thread-pools

Update to use static ThreadPoolExecutors
This commit is contained in:
t-samples
2023-11-15 12:00:24 -06:00
committed by GitHub
4 changed files with 50 additions and 5 deletions

View File

@ -23,7 +23,6 @@ package com.kingsrook.qqq.backend.core.actions;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.kingsrook.qqq.backend.core.context.CapturedContext; import com.kingsrook.qqq.backend.core.context.CapturedContext;
import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.context.QContext;
@ -54,7 +53,7 @@ public abstract class AbstractQActionBiConsumer<I extends AbstractActionInput, O
{ {
CapturedContext capturedContext = QContext.capture(); CapturedContext capturedContext = QContext.capture();
CompletableFuture<Void> completableFuture = new CompletableFuture<>(); CompletableFuture<Void> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> ActionHelper.getExecutorService().submit(() ->
{ {
try try
{ {

View File

@ -23,7 +23,6 @@ package com.kingsrook.qqq.backend.core.actions;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import com.kingsrook.qqq.backend.core.context.CapturedContext; import com.kingsrook.qqq.backend.core.context.CapturedContext;
import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.context.QContext;
@ -54,7 +53,7 @@ public abstract class AbstractQActionFunction<I extends AbstractActionInput, O e
{ {
CapturedContext capturedContext = QContext.capture(); CapturedContext capturedContext = QContext.capture();
CompletableFuture<O> completableFuture = new CompletableFuture<>(); CompletableFuture<O> completableFuture = new CompletableFuture<>();
Executors.newCachedThreadPool().submit(() -> ActionHelper.getExecutorService().submit(() ->
{ {
try try
{ {

View File

@ -24,6 +24,10 @@ package com.kingsrook.qqq.backend.core.actions;
import java.io.Serializable; import java.io.Serializable;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QAuthenticationException; import com.kingsrook.qqq.backend.core.exceptions.QAuthenticationException;
@ -40,6 +44,20 @@ import com.kingsrook.qqq.backend.core.modules.authentication.QAuthenticationModu
*******************************************************************************/ *******************************************************************************/
public class ActionHelper public class ActionHelper
{ {
/////////////////////////////////////////////////////////////////////////////
// we would probably use Executors.newCachedThreadPool() - but - it has no //
// maxPoolSize... we think some limit is good, so that at a large number //
// of attempted concurrent jobs we'll have new jobs block, rather than //
// exhausting all server resources and locking up "everything" //
// also, it seems like keeping a handful of core-threads around is very //
// little actual waste, and better than ever wasting time starting a new //
// one, which we know we'll often be doing. //
/////////////////////////////////////////////////////////////////////////////
private static Integer CORE_THREADS = 8;
private static Integer MAX_THREADS = 500;
private static ExecutorService executorService = new ThreadPoolExecutor(CORE_THREADS, MAX_THREADS, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
/******************************************************************************* /*******************************************************************************
** **
@ -69,6 +87,17 @@ public class ActionHelper
/*******************************************************************************
** access an executor service for sharing among the executeAsync methods of all
** actions.
*******************************************************************************/
static ExecutorService getExecutorService()
{
return (executorService);
}
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/

View File

@ -28,6 +28,9 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import com.kingsrook.qqq.backend.core.context.CapturedContext; import com.kingsrook.qqq.backend.core.context.CapturedContext;
@ -51,9 +54,24 @@ public class AsyncJobManager
{ {
private static final QLogger LOG = QLogger.getLogger(AsyncJobManager.class); private static final QLogger LOG = QLogger.getLogger(AsyncJobManager.class);
/////////////////////////////////////////////////////////////////////////////
// we would probably use Executors.newCachedThreadPool() - but - it has no //
// maxPoolSize... we think some limit is good, so that at a large number //
// of attempted concurrent jobs we'll have new jobs block, rather than //
// exhausting all server resources and locking up "everything" //
// also, it seems like keeping a handful of core-threads around is very //
// little actual waste, and better than ever wasting time starting a new //
// one, which we know we'll often be doing. //
/////////////////////////////////////////////////////////////////////////////
private static Integer CORE_THREADS = 8;
private static Integer MAX_THREADS = 500;
private static ExecutorService executorService = new ThreadPoolExecutor(CORE_THREADS, MAX_THREADS, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
private String forcedJobUUID = null; private String forcedJobUUID = null;
/******************************************************************************* /*******************************************************************************
** Start a job - if it finishes within the specified timeout, get its results, ** Start a job - if it finishes within the specified timeout, get its results,
** else, get back an exception with the job id. ** else, get back an exception with the job id.
@ -84,7 +102,7 @@ public class AsyncJobManager
{ {
QContext.init(capturedContext); QContext.init(capturedContext);
return (runAsyncJob(jobName, asyncJob, uuidAndTypeStateKey, asyncJobStatus)); return (runAsyncJob(jobName, asyncJob, uuidAndTypeStateKey, asyncJobStatus));
}); }, executorService);
if(timeout == 0) if(timeout == 0)
{ {