Merged feature/process-locks-bulk into dev

This commit is contained in:
2025-01-22 16:43:56 -06:00
4 changed files with 584 additions and 114 deletions

View File

@ -95,7 +95,7 @@ public class ProcessLockMetaDataProducer implements MetaDataProducerInterface<Me
.withRightTable(ProcessLock.TABLE_NAME) .withRightTable(ProcessLock.TABLE_NAME)
.withInferredName() .withInferredName()
.withType(JoinType.ONE_TO_MANY) .withType(JoinType.ONE_TO_MANY)
.withJoinOn(new JoinOn("name", "processLockTypeId")) .withJoinOn(new JoinOn("id", "processLockTypeId"))
); );
return output; return output;

View File

@ -0,0 +1,53 @@
/*
* QQQ - Low-code Application Framework for Engineers.
* Copyright (C) 2021-2024. Kingsrook, LLC
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
* contact@kingsrook.com
* https://github.com/Kingsrook/
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.kingsrook.qqq.backend.core.processes.locks;
/***************************************************************************
** Record to hold either a processLock, or an unableToObtainProcessLockException.
** Used as return value from bulk-methods in ProcessLockUtils (where some
** requested keys may succeed and return a lock, and others may fail
** and return the exception).
***************************************************************************/
public record ProcessLockOrException(ProcessLock processLock, UnableToObtainProcessLockException unableToObtainProcessLockException)
{
/*******************************************************************************
** Constructor
**
*******************************************************************************/
public ProcessLockOrException(ProcessLock processLock)
{
this(processLock, null);
}
/*******************************************************************************
** Constructor
**
*******************************************************************************/
public ProcessLockOrException(UnableToObtainProcessLockException unableToObtainProcessLockException)
{
this(null, unableToObtainProcessLockException);
}
}

View File

@ -22,14 +22,24 @@
package com.kingsrook.qqq.backend.core.processes.locks; package com.kingsrook.qqq.backend.core.processes.locks;
import java.io.Serializable;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import com.kingsrook.qqq.backend.core.actions.tables.DeleteAction; import com.kingsrook.qqq.backend.core.actions.tables.DeleteAction;
import com.kingsrook.qqq.backend.core.actions.tables.GetAction; import com.kingsrook.qqq.backend.core.actions.tables.GetAction;
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction; import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
import com.kingsrook.qqq.backend.core.actions.values.QValueFormatter; import com.kingsrook.qqq.backend.core.actions.values.QValueFormatter;
import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.context.QContext;
@ -39,6 +49,10 @@ import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteOutput; import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.get.GetInput; import com.kingsrook.qqq.backend.core.model.actions.tables.get.GetInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput; import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
import com.kingsrook.qqq.backend.core.model.data.QRecord; import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.session.QSession; import com.kingsrook.qqq.backend.core.model.session.QSession;
@ -70,10 +84,79 @@ public class ProcessLockUtils
/******************************************************************************* /*******************************************************************************
** try to create a process lock, of a given key & type - but immediately fail
** if the lock already exists.
** **
** @param key along with typeName, part of Unique Key for the lock.
** @param typeName along with key, part of Unique Key for the lock. Must be a
* defined lock type, from which we derive defaultExpirationSeconds.
** @param details advice to show users re: who/what created the lock.
*******************************************************************************/ *******************************************************************************/
public static ProcessLock create(String key, String typeName, String details) throws UnableToObtainProcessLockException, QException public static ProcessLock create(String key, String typeName, String details) throws UnableToObtainProcessLockException, QException
{ {
Map<String, ProcessLockOrException> locks = createMany(List.of(key), typeName, details);
return getProcessLockOrThrow(key, locks);
}
/*******************************************************************************
** try to create a process lock, of a given key & type - and re-try if it failed.
** (e.g., wait until existing lock holder releases the lock).
**
** @param key along with typeName, part of Unique Key for the lock.
** @param typeName along with key, part of Unique Key for the lock. Must be a
* defined lock type, from which we derive defaultExpirationSeconds.
** @param details advice to show users re: who/what created the lock.
** @param sleepBetweenTries how long to sleep between retries.
** @param maxWait max amount of that will be waited between call to this method
* and an eventual UnableToObtainProcessLockException (plus or minus
* one sleepBetweenTries (actually probably just plus that).
**
*******************************************************************************/
public static ProcessLock create(String key, String typeName, String details, Duration sleepBetweenTries, Duration maxWait) throws UnableToObtainProcessLockException, QException
{
Map<String, ProcessLockOrException> locks = createMany(List.of(key), typeName, details, sleepBetweenTries, maxWait);
return getProcessLockOrThrow(key, locks);
}
/***************************************************************************
** For the single-lock versions of create, either return the lock identified by
** key, or throw.
***************************************************************************/
private static ProcessLock getProcessLockOrThrow(String key, Map<String, ProcessLockOrException> locks) throws UnableToObtainProcessLockException
{
if(locks.get(key) != null && locks.get(key).processLock() != null)
{
return (locks.get(key).processLock());
}
else if(locks.get(key) != null && locks.get(key).unableToObtainProcessLockException() != null)
{
throw (locks.get(key).unableToObtainProcessLockException());
}
else
{
throw (new UnableToObtainProcessLockException("Missing key [" + key + "] in response from request to create lock. Lock not created."));
}
}
/*******************************************************************************
** try to create many process locks, of list of keys & a type - but immediately
** fail (on a one-by-one basis) if the lock already exists.
**
** @param keys along with typeName, part of Unique Key for the lock.
** @param typeName along with key, part of Unique Key for the lock. Must be a
* defined lock type, from which we derive defaultExpirationSeconds.
** @param details advice to show users re: who/what created the lock.
*******************************************************************************/
public static Map<String, ProcessLockOrException> createMany(List<String> keys, String typeName, String details) throws QException
{
Map<String, ProcessLockOrException> rs = new HashMap<>();
ProcessLockType lockType = getProcessLockTypeByName(typeName); ProcessLockType lockType = getProcessLockTypeByName(typeName);
if(lockType == null) if(lockType == null)
{ {
@ -83,6 +166,11 @@ public class ProcessLockUtils
QSession qSession = QContext.getQSession(); QSession qSession = QContext.getQSession();
Instant now = Instant.now(); Instant now = Instant.now();
Integer defaultExpirationSeconds = lockType.getDefaultExpirationSeconds();
List<ProcessLock> processLocksToInsert = new ArrayList<>();
Function<String, ProcessLock> constructProcessLockFromKey = (key) ->
{
ProcessLock processLock = new ProcessLock() ProcessLock processLock = new ProcessLock()
.withKey(key) .withKey(key)
.withProcessLockTypeId(lockType.getId()) .withProcessLockTypeId(lockType.getId())
@ -91,22 +179,74 @@ public class ProcessLockUtils
.withDetails(details) .withDetails(details)
.withCheckInTimestamp(now); .withCheckInTimestamp(now);
Integer defaultExpirationSeconds = lockType.getDefaultExpirationSeconds();
if(defaultExpirationSeconds != null) if(defaultExpirationSeconds != null)
{ {
processLock.setExpiresAtTimestamp(now.plusSeconds(defaultExpirationSeconds)); processLock.setExpiresAtTimestamp(now.plusSeconds(defaultExpirationSeconds));
} }
QRecord insertOutputRecord = tryToInsert(processLock); return (processLock);
};
//////////////////////////////////////////////////////////// for(String key : keys)
// if inserting failed... see if we can get existing lock // {
//////////////////////////////////////////////////////////// processLocksToInsert.add(constructProcessLockFromKey.apply(key));
}
Map<String, ProcessLockOrException> insertResultMap = tryToInsertMany(processLocksToInsert);
////////////////////////////////////////
// look at which (if any) keys failed //
////////////////////////////////////////
Set<String> failedKeys = new HashSet<>();
for(Map.Entry<String, ProcessLockOrException> entry : insertResultMap.entrySet())
{
if(entry.getValue().unableToObtainProcessLockException() != null)
{
failedKeys.add(entry.getKey());
}
}
//////////////////////////////////////////////////////////////////////
// if any keys failed, try to get the existing locks for those keys //
//////////////////////////////////////////////////////////////////////
Map<String, QRecord> existingLockRecords = new HashMap<>();
if(CollectionUtils.nullSafeHasContents(failedKeys))
{
QueryOutput queryOutput = new QueryAction().execute(new QueryInput(ProcessLock.TABLE_NAME).withFilter(new QQueryFilter()
.withCriteria("processLockTypeId", QCriteriaOperator.EQUALS, lockType.getId())
.withCriteria("key", QCriteriaOperator.IN, failedKeys)));
for(QRecord record : queryOutput.getRecords())
{
existingLockRecords.put(record.getValueString("key"), record);
}
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// loop over results from insert call - either adding successes to the output structure, or adding details about failures, //
// OR - deleting expired locks and trying a second insert! //
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
List<Serializable> deleteIdList = new ArrayList<>();
List<ProcessLock> tryAgainList = new ArrayList<>();
Map<String, String> existingLockDetailsMap = new HashMap<>();
Map<String, ProcessLock> existingLockMap = new HashMap<>();
for(Map.Entry<String, ProcessLockOrException> entry : insertResultMap.entrySet())
{
String key = entry.getKey();
ProcessLock processLock = entry.getValue().processLock();
//////////////////////////////////////////////////////////////////////////
// if inserting failed... see if we found an existing lock for this key //
//////////////////////////////////////////////////////////////////////////
StringBuilder existingLockDetails = new StringBuilder(); StringBuilder existingLockDetails = new StringBuilder();
ProcessLock existingLock = null; ProcessLock existingLock = null;
if(CollectionUtils.nullSafeHasContents(insertOutputRecord.getErrors()))
if(processLock != null)
{ {
QRecord existingLockRecord = new GetAction().executeForRecord(new GetInput(ProcessLock.TABLE_NAME).withUniqueKey(Map.of("key", key, "processLockTypeId", lockType.getId()))); rs.put(key, new ProcessLockOrException(processLock));
}
else
{
QRecord existingLockRecord = existingLockRecords.get(key);
if(existingLockRecord != null) if(existingLockRecord != null)
{ {
existingLock = new ProcessLock(existingLockRecord); existingLock = new ProcessLock(existingLockRecord);
@ -127,6 +267,9 @@ public class ProcessLockUtils
existingLockDetails.append("; expiring at: ").append(QValueFormatter.formatDateTimeWithZone(zonedExpiresAt)); existingLockDetails.append("; expiring at: ").append(QValueFormatter.formatDateTimeWithZone(zonedExpiresAt));
} }
existingLockDetailsMap.put(key, existingLockDetails.toString());
existingLockMap.put(key, existingLock);
if(expiresAtTimestamp != null && expiresAtTimestamp.isBefore(now)) if(expiresAtTimestamp != null && expiresAtTimestamp.isBefore(now))
{ {
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
@ -134,85 +277,193 @@ public class ProcessLockUtils
///////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////
LOG.info("Existing lock has expired - deleting it and trying again.", logPair("id", existingLock.getId()), LOG.info("Existing lock has expired - deleting it and trying again.", logPair("id", existingLock.getId()),
logPair("key", key), logPair("type", typeName), logPair("details", details), logPair("expiresAtTimestamp", expiresAtTimestamp)); logPair("key", key), logPair("type", typeName), logPair("details", details), logPair("expiresAtTimestamp", expiresAtTimestamp));
new DeleteAction().execute(new DeleteInput(ProcessLock.TABLE_NAME).withPrimaryKey(existingLock.getId())); deleteIdList.add(existingLock.getId());
insertOutputRecord = tryToInsert(processLock); tryAgainList.add(constructProcessLockFromKey.apply(key));
} }
} }
else else
{ {
///////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// if existing lock doesn't exist, try to insert again // // if existing lock doesn't exist now (e.g., it was deleted before the UC //
///////////////////////////////////////////////////////// // check failed and when we looked for it), then just try to insert it again //
insertOutputRecord = tryToInsert(processLock); ///////////////////////////////////////////////////////////////////////////////
tryAgainList.add(constructProcessLockFromKey.apply(key));
}
} }
} }
if(CollectionUtils.nullSafeHasContents(insertOutputRecord.getErrors())) /////////////////////////////////////////////////////
// if there are expired locks to delete, do so now //
/////////////////////////////////////////////////////
if(!deleteIdList.isEmpty())
{ {
///////////////////////////////////////////////////////////////////////////////// new DeleteAction().execute(new DeleteInput(ProcessLock.TABLE_NAME).withPrimaryKeys(deleteIdList));
// if at this point, we have errors on the last attempted insert, then give up //
/////////////////////////////////////////////////////////////////////////////////
LOG.info("Errors in process lock record after attempted insert", logPair("errors", insertOutputRecord.getErrors()),
logPair("key", key), logPair("type", typeName), logPair("details", details));
throw (new UnableToObtainProcessLockException("A Process Lock already exists for key [" + key + "] of type [" + typeName + "], " + existingLockDetails)
.withExistingLock(existingLock));
} }
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// if there are any to try again (either because we just deleted their now-expired locks, or because we otherwise couldn't find their locks, do so now //
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
if(!tryAgainList.isEmpty())
{
Map<String, ProcessLockOrException> tryAgainResult = tryToInsertMany(tryAgainList);
for(Map.Entry<String, ProcessLockOrException> entry : tryAgainResult.entrySet())
{
String key = entry.getKey();
ProcessLock processLock = entry.getValue().processLock();
UnableToObtainProcessLockException unableToObtainProcessLockException = entry.getValue().unableToObtainProcessLockException();
if(processLock != null)
{
rs.put(key, new ProcessLockOrException(processLock));
}
else
{
rs.put(key, new ProcessLockOrException(Objects.requireNonNullElseGet(unableToObtainProcessLockException, () -> new UnableToObtainProcessLockException("Process lock not created, but no details available."))));
}
}
}
////////////////////////////////////////////////////////////////////
// put anything not successfully created into result map as error //
////////////////////////////////////////////////////////////////////
for(ProcessLock processLock : processLocksToInsert)
{
String key = processLock.getKey();
if(rs.containsKey(key))
{
LOG.info("Created process lock", logPair("id", processLock.getId()), LOG.info("Created process lock", logPair("id", processLock.getId()),
logPair("key", key), logPair("type", typeName), logPair("details", details), logPair("expiresAtTimestamp", processLock.getExpiresAtTimestamp())); logPair("key", key), logPair("type", typeName), logPair("details", details), logPair("expiresAtTimestamp", processLock.getExpiresAtTimestamp()));
return new ProcessLock(insertOutputRecord); }
else
{
if(existingLockDetailsMap.containsKey(key))
{
rs.put(key, new ProcessLockOrException(new UnableToObtainProcessLockException("A Process Lock already exists for key [" + key + "] of type [" + typeName + "], " + existingLockDetailsMap.get(key))
.withExistingLock(existingLockMap.get(key))));
}
else
{
rs.put(key, new ProcessLockOrException(new UnableToObtainProcessLockException("Process lock for key [" + key + "] of type [" + typeName + "] was not created...")));
}
}
}
return (rs);
} }
/******************************************************************************* /*******************************************************************************
** ** Try to do an insert - noting that an exception from the InsertAction will be
** caught in here, and placed in the records as an Error!
*******************************************************************************/ *******************************************************************************/
private static QRecord tryToInsert(ProcessLock processLock) throws QException private static Map<String, ProcessLockOrException> tryToInsertMany(List<ProcessLock> processLocks)
{ {
return new InsertAction().execute(new InsertInput(ProcessLock.TABLE_NAME).withRecordEntity(processLock)).getRecords().get(0); Map<String, ProcessLockOrException> rs = new HashMap<>();
try
{
List<QRecord> insertedRecords = new InsertAction().execute(new InsertInput(ProcessLock.TABLE_NAME).withRecordEntities(processLocks)).getRecords();
for(QRecord insertedRecord : insertedRecords)
{
String key = insertedRecord.getValueString("key");
if(CollectionUtils.nullSafeHasContents(insertedRecord.getErrors()))
{
rs.put(key, new ProcessLockOrException(new UnableToObtainProcessLockException(insertedRecord.getErrors().get(0).getMessage())));
}
else
{
rs.put(key, new ProcessLockOrException(new ProcessLock(insertedRecord)));
}
}
}
catch(Exception e)
{
for(ProcessLock processLock : processLocks)
{
rs.put(processLock.getKey(), new ProcessLockOrException(new UnableToObtainProcessLockException("Error attempting to insert process lock: " + e.getMessage())));
}
}
return (rs);
} }
/******************************************************************************* /*******************************************************************************
** try to create many process locks, of a given list of key & a type - and re-try
** upon failures (e.g., wait until existing lock holder releases the lock).
**
** @param keys along with typeName, part of Unique Key for the lock.
** @param typeName along with key, part of Unique Key for the lock. Must be a
* defined lock type, from which we derive defaultExpirationSeconds.
** @param details advice to show users re: who/what created the lock.
** @param sleepBetweenTries how long to sleep between retries.
** @param maxWait max amount of that will be waited between call to this method
* and an eventual UnableToObtainProcessLockException (plus or minus
* one sleepBetweenTries (actually probably just plus that).
** **
*******************************************************************************/ *******************************************************************************/
public static ProcessLock create(String key, String type, String holderId, Duration sleepBetweenTries, Duration maxWait) throws UnableToObtainProcessLockException, QException public static Map<String, ProcessLockOrException> createMany(List<String> keys, String typeName, String details, Duration sleepBetweenTries, Duration maxWait) throws QException
{ {
Map<String, ProcessLockOrException> rs = new HashMap<>();
Map<String, UnableToObtainProcessLockException> lastExceptionsPerKey = new HashMap<>();
Set<String> stillNeedCreated = new HashSet<>(keys);
Instant giveUpTime = Instant.now().plus(maxWait); Instant giveUpTime = Instant.now().plus(maxWait);
UnableToObtainProcessLockException lastCaughtUnableToObtainProcessLockException = null; UnableToObtainProcessLockException lastCaughtUnableToObtainProcessLockException = null;
while(true) while(true)
{ {
try Map<String, ProcessLockOrException> createManyResult = createMany(stillNeedCreated.size() == keys.size() ? keys : new ArrayList<>(stillNeedCreated), typeName, details);
for(Map.Entry<String, ProcessLockOrException> entry : createManyResult.entrySet())
{ {
ProcessLock processLock = create(key, type, holderId); String key = entry.getKey();
return (processLock); ProcessLockOrException processLockOrException = entry.getValue();
if(processLockOrException.processLock() != null)
{
rs.put(key, processLockOrException);
stillNeedCreated.remove(key);
} }
catch(UnableToObtainProcessLockException e) else if(processLockOrException.unableToObtainProcessLockException() != null)
{ {
lastCaughtUnableToObtainProcessLockException = e; lastExceptionsPerKey.put(key, processLockOrException.unableToObtainProcessLockException());
}
}
if(stillNeedCreated.isEmpty())
{
//////////////////////////////////////////////////////////
// if they've all been created now, great, return them! //
//////////////////////////////////////////////////////////
return (rs);
}
/////////////////////////////////////////////////////////////////////////////
// oops, let's sleep (if we're before the give up time) and then try again //
/////////////////////////////////////////////////////////////////////////////
if(Instant.now().plus(sleepBetweenTries).isBefore(giveUpTime)) if(Instant.now().plus(sleepBetweenTries).isBefore(giveUpTime))
{ {
SleepUtils.sleep(sleepBetweenTries); SleepUtils.sleep(sleepBetweenTries);
} }
else else
{ {
/////////////////////////////////
// else, break if out of time! //
/////////////////////////////////
break; break;
} }
} }
////////////////////////////////////////////////////////////////////////////////////////////
// any that didn't get created, they need their last error (or a new error) put in the rs //
////////////////////////////////////////////////////////////////////////////////////////////
for(String key : stillNeedCreated)
{
rs.put(key, new ProcessLockOrException(lastExceptionsPerKey.getOrDefault(key, new UnableToObtainProcessLockException("Missing key [" + key + "] in response from request to create lock. Lock not created."))));
} }
/////////////////////////////////////////////////////////////////////////////////////////////////// return (rs);
// this variable can never be null with current code-path, but prefer to be defensive regardless //
///////////////////////////////////////////////////////////////////////////////////////////////////
@SuppressWarnings("ConstantValue")
String suffix = lastCaughtUnableToObtainProcessLockException == null ? "" : ": " + lastCaughtUnableToObtainProcessLockException.getMessage();
//noinspection ConstantValue
throw (new UnableToObtainProcessLockException("Unable to obtain process lock for key [" + key + "] in type [" + type + "] after [" + maxWait + "]" + suffix)
.withExistingLock(lastCaughtUnableToObtainProcessLockException == null ? null : lastCaughtUnableToObtainProcessLockException.getExistingLock()));
} }
@ -389,27 +640,41 @@ public class ProcessLockUtils
{ {
if(id == null) if(id == null)
{ {
LOG.debug("No id passed in to releaseById - returning with noop"); LOG.debug("No ids passed in to releaseById - returning with noop");
return;
}
releaseByIds(List.of(id));
}
/*******************************************************************************
**
*******************************************************************************/
public static void releaseByIds(List<? extends Serializable> ids)
{
List<Serializable> nonNullIds = ids == null ? Collections.emptyList() : ids.stream().filter(Objects::nonNull).map(o -> (Serializable) o).toList();
if(CollectionUtils.nullSafeIsEmpty(nonNullIds))
{
LOG.debug("No ids passed in to releaseById - returning with noop");
return; return;
} }
ProcessLock processLock = null;
try try
{ {
processLock = ProcessLockUtils.getById(id); DeleteOutput deleteOutput = new DeleteAction().execute(new DeleteInput(ProcessLock.TABLE_NAME).withPrimaryKeys(nonNullIds));
if(processLock == null) if(CollectionUtils.nullSafeHasContents(deleteOutput.getRecordsWithErrors()))
{ {
LOG.info("Process lock not found in releaseById call", logPair("id", id)); throw (new QException("Error deleting processLocks: " + deleteOutput.getRecordsWithErrors().get(0).getErrorsAsString()));
} }
LOG.info("Released process locks", logPair("ids", nonNullIds));
} }
catch(QException e) catch(QException e)
{ {
LOG.warn("Exception releasing processLock byId", e, logPair("id", id)); LOG.warn("Exception releasing processLocks byId", e, logPair("ids", ids));
}
if(processLock != null)
{
release(processLock);
} }
} }
@ -419,8 +684,6 @@ public class ProcessLockUtils
** **
*******************************************************************************/ *******************************************************************************/
public static void release(ProcessLock processLock) public static void release(ProcessLock processLock)
{
try
{ {
if(processLock == null) if(processLock == null)
{ {
@ -428,18 +691,27 @@ public class ProcessLockUtils
return; return;
} }
DeleteOutput deleteOutput = new DeleteAction().execute(new DeleteInput(ProcessLock.TABLE_NAME).withPrimaryKey(processLock.getId())); releaseMany(List.of(processLock));
if(CollectionUtils.nullSafeHasContents(deleteOutput.getRecordsWithErrors()))
{
throw (new QException("Error deleting processLock record: " + deleteOutput.getRecordsWithErrors().get(0).getErrorsAsString()));
} }
LOG.info("Released process lock", logPair("id", processLock.getId()), logPair("key", processLock.getKey()), logPair("typeId", processLock.getProcessLockTypeId()), logPair("details", processLock.getDetails()));
}
catch(QException e) /*******************************************************************************
**
*******************************************************************************/
public static void releaseMany(List<ProcessLock> processLocks)
{ {
LOG.warn("Exception releasing processLock", e, logPair("processLockId", () -> processLock.getId())); if(CollectionUtils.nullSafeIsEmpty(processLocks))
{
LOG.debug("No process locks passed in to release - returning with noop");
return;
} }
List<Serializable> ids = processLocks.stream()
.filter(Objects::nonNull)
.map(pl -> (Serializable) pl.getId())
.toList();
releaseByIds(ids);
} }

View File

@ -25,13 +25,17 @@ package com.kingsrook.qqq.backend.core.processes.locks;
import java.time.Duration; import java.time.Duration;
import java.time.Instant; import java.time.Instant;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import com.kingsrook.qqq.backend.core.BaseTest; import com.kingsrook.qqq.backend.core.BaseTest;
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction; import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
import com.kingsrook.qqq.backend.core.context.QContext; import com.kingsrook.qqq.backend.core.context.QContext;
import com.kingsrook.qqq.backend.core.exceptions.QException; import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.instances.QInstanceValidator; import com.kingsrook.qqq.backend.core.instances.QInstanceValidator;
import com.kingsrook.qqq.backend.core.logging.QCollectingLogger;
import com.kingsrook.qqq.backend.core.logging.QLogger;
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput; import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
import com.kingsrook.qqq.backend.core.model.metadata.MetaDataProducerMultiOutput; import com.kingsrook.qqq.backend.core.model.metadata.MetaDataProducerMultiOutput;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance; import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
@ -39,6 +43,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.session.QUser; import com.kingsrook.qqq.backend.core.model.session.QUser;
import com.kingsrook.qqq.backend.core.utils.SleepUtils; import com.kingsrook.qqq.backend.core.utils.SleepUtils;
import com.kingsrook.qqq.backend.core.utils.TestUtils; import com.kingsrook.qqq.backend.core.utils.TestUtils;
import com.kingsrook.qqq.backend.core.utils.collections.ListBuilder;
import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
@ -227,6 +232,28 @@ class ProcessLockUtilsTest extends BaseTest
/*******************************************************************************
**
*******************************************************************************/
@Test
void testReleaseBadInputs()
{
//////////////////////////////////////////////////////////
// make sure we don't blow up, just noop in these cases //
//////////////////////////////////////////////////////////
QCollectingLogger qCollectingLogger = QLogger.activateCollectingLoggerForClass(ProcessLockUtils.class);
ProcessLockUtils.releaseById(null);
ProcessLockUtils.release(null);
ProcessLockUtils.releaseMany(null);
ProcessLockUtils.releaseByIds(null);
ProcessLockUtils.releaseMany(ListBuilder.of(null));
ProcessLockUtils.releaseByIds(ListBuilder.of(null));
QLogger.deactivateCollectingLoggerForClass(ProcessLockUtils.class);
assertEquals(6, qCollectingLogger.getCollectedMessages().stream().filter(m -> m.getMessage().contains("noop")).count());
}
/******************************************************************************* /*******************************************************************************
** **
*******************************************************************************/ *******************************************************************************/
@ -304,7 +331,7 @@ class ProcessLockUtilsTest extends BaseTest
////////////////////////////////////////////// //////////////////////////////////////////////
// checkin w/ a time - sets it to that time // // checkin w/ a time - sets it to that time //
////////////////////////////////////////////// //////////////////////////////////////////////
Instant specifiedTime = Instant.now(); Instant specifiedTime = Instant.now().plusSeconds(47);
ProcessLockUtils.checkIn(processLock, specifiedTime); ProcessLockUtils.checkIn(processLock, specifiedTime);
processLock = ProcessLockUtils.getById(processLock.getId()); processLock = ProcessLockUtils.getById(processLock.getId());
assertEquals(specifiedTime, processLock.getExpiresAtTimestamp()); assertEquals(specifiedTime, processLock.getExpiresAtTimestamp());
@ -380,4 +407,122 @@ class ProcessLockUtilsTest extends BaseTest
assertNull(processLock.getExpiresAtTimestamp()); assertNull(processLock.getExpiresAtTimestamp());
} }
/*******************************************************************************
**
*******************************************************************************/
@Test
void testMany() throws QException
{
/////////////////////////////////////////////////
// make sure that we can create multiple locks //
/////////////////////////////////////////////////
List<String> keys = List.of("1", "2", "3");
List<ProcessLock> processLocks = new ArrayList<>();
Map<String, ProcessLockOrException> results = ProcessLockUtils.createMany(keys, "typeA", "me");
for(String key : keys)
{
ProcessLock processLock = results.get(key).processLock();
assertNotNull(processLock.getId());
assertNotNull(processLock.getCheckInTimestamp());
assertNull(processLock.getExpiresAtTimestamp());
processLocks.add(processLock);
}
/////////////////////////////////////////////////////////
// make sure we can't create a second for the same key //
/////////////////////////////////////////////////////////
assertThatThrownBy(() -> ProcessLockUtils.create("1", "typeA", "you"))
.isInstanceOf(UnableToObtainProcessLockException.class)
.hasMessageContaining("Held by: " + QContext.getQSession().getUser().getIdReference())
.hasMessageContaining("with details: me")
.hasMessageNotContaining("expiring at: 20")
.matches(e -> ((UnableToObtainProcessLockException) e).getExistingLock() != null);
/////////////////////////////////////////////////////////
// make sure we can create another for a different key //
/////////////////////////////////////////////////////////
ProcessLockUtils.create("4", "typeA", "him");
/////////////////////////////////////////////////////////////////////
// make sure we can create another for a different type (same key) //
/////////////////////////////////////////////////////////////////////
ProcessLockUtils.create("1", "typeB", "her");
////////////////////////////////////////////////////////////////////
// now try to create some that will overlap, but one that'll work //
////////////////////////////////////////////////////////////////////
keys = List.of("3", "4", "5");
results = ProcessLockUtils.createMany(keys, "typeA", "me");
for(String key : List.of("3", "4"))
{
UnableToObtainProcessLockException exception = results.get(key).unableToObtainProcessLockException();
assertNotNull(exception);
}
ProcessLock processLock = results.get("5").processLock();
assertNotNull(processLock.getId());
assertNotNull(processLock.getCheckInTimestamp());
assertNull(processLock.getExpiresAtTimestamp());
processLocks.add(processLock);
//////////////////////////////
// make sure we can release //
//////////////////////////////
ProcessLockUtils.releaseMany(processLocks);
///////////////////////////////////////////////////////
// make sure can re-lock 1 now after it was released //
///////////////////////////////////////////////////////
processLock = ProcessLockUtils.create("1", "typeA", "you");
assertNotNull(processLock.getId());
assertEquals("you", processLock.getDetails());
}
/*******************************************************************************
**
*******************************************************************************/
@Test
void testManyWithSleep() throws QException
{
/////////////////////////////////////////////////
// make sure that we can create multiple locks //
/////////////////////////////////////////////////
List<String> keys = List.of("1", "2", "3");
Map<String, ProcessLockOrException> results0 = ProcessLockUtils.createMany(keys, "typeB", "me");
for(String key : keys)
{
assertNotNull(results0.get(key).processLock());
}
////////////////////////////////////////////////////////////
// try again - and 2 and 3 should fail, if we don't sleep //
////////////////////////////////////////////////////////////
keys = List.of("2", "3", "4");
Map<String, ProcessLockOrException> results1 = ProcessLockUtils.createMany(keys, "typeB", "you");
assertNull(results1.get("2").processLock());
assertNull(results1.get("3").processLock());
assertNotNull(results1.get("4").processLock());
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// try another insert, which should initially succeed for #5, then sleep, and eventually succeed on 3 & 4 as well //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
keys = List.of("3", "4", "5");
Map<String, ProcessLockOrException> results2 = ProcessLockUtils.createMany(keys, "typeB", "them", Duration.of(1, ChronoUnit.SECONDS), Duration.of(3, ChronoUnit.SECONDS));
for(String key : keys)
{
assertNotNull(results2.get(key).processLock());
}
////////////////////////////////////////////////////////////////////////////////////////////////
// make sure that we have a different ids for some that expired and then succeeded post-sleep //
////////////////////////////////////////////////////////////////////////////////////////////////
assertNotEquals(results0.get("3").processLock().getId(), results2.get("3").processLock().getId());
assertNotEquals(results1.get("4").processLock().getId(), results2.get("4").processLock().getId());
}
} }