mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-17 20:50:44 +00:00
Add methods to ProcessLockUtils to work in bulk (both for creating and releasing locks); fix ProcessLock join to type table (had wrong joinOn field)
This commit is contained in:
@ -95,7 +95,7 @@ public class ProcessLockMetaDataProducer implements MetaDataProducerInterface<Me
|
||||
.withRightTable(ProcessLock.TABLE_NAME)
|
||||
.withInferredName()
|
||||
.withType(JoinType.ONE_TO_MANY)
|
||||
.withJoinOn(new JoinOn("name", "processLockTypeId"))
|
||||
.withJoinOn(new JoinOn("id", "processLockTypeId"))
|
||||
);
|
||||
|
||||
return output;
|
||||
|
@ -0,0 +1,53 @@
|
||||
/*
|
||||
* QQQ - Low-code Application Framework for Engineers.
|
||||
* Copyright (C) 2021-2024. Kingsrook, LLC
|
||||
* 651 N Broad St Ste 205 # 6917 | Middletown DE 19709 | United States
|
||||
* contact@kingsrook.com
|
||||
* https://github.com/Kingsrook/
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package com.kingsrook.qqq.backend.core.processes.locks;
|
||||
|
||||
|
||||
/***************************************************************************
|
||||
** Record to hold either a processLock, or an unableToObtainProcessLockException.
|
||||
** Used as return value from bulk-methods in ProcessLockUtils (where some
|
||||
** requested keys may succeed and return a lock, and others may fail
|
||||
** and return the exception).
|
||||
***************************************************************************/
|
||||
public record ProcessLockOrException(ProcessLock processLock, UnableToObtainProcessLockException unableToObtainProcessLockException)
|
||||
{
|
||||
|
||||
/*******************************************************************************
|
||||
** Constructor
|
||||
**
|
||||
*******************************************************************************/
|
||||
public ProcessLockOrException(ProcessLock processLock)
|
||||
{
|
||||
this(processLock, null);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Constructor
|
||||
**
|
||||
*******************************************************************************/
|
||||
public ProcessLockOrException(UnableToObtainProcessLockException unableToObtainProcessLockException)
|
||||
{
|
||||
this(null, unableToObtainProcessLockException);
|
||||
}
|
||||
}
|
@ -22,14 +22,24 @@
|
||||
package com.kingsrook.qqq.backend.core.processes.locks;
|
||||
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.DeleteAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.GetAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.QueryAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.UpdateAction;
|
||||
import com.kingsrook.qqq.backend.core.actions.values.QValueFormatter;
|
||||
import com.kingsrook.qqq.backend.core.context.QContext;
|
||||
@ -39,6 +49,10 @@ import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.delete.DeleteOutput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.get.GetInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QCriteriaOperator;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QQueryFilter;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryInput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.query.QueryOutput;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.update.UpdateInput;
|
||||
import com.kingsrook.qqq.backend.core.model.data.QRecord;
|
||||
import com.kingsrook.qqq.backend.core.model.session.QSession;
|
||||
@ -70,10 +84,79 @@ public class ProcessLockUtils
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** try to create a process lock, of a given key & type - but immediately fail
|
||||
** if the lock already exists.
|
||||
**
|
||||
** @param key along with typeName, part of Unique Key for the lock.
|
||||
** @param typeName along with key, part of Unique Key for the lock. Must be a
|
||||
* defined lock type, from which we derive defaultExpirationSeconds.
|
||||
** @param details advice to show users re: who/what created the lock.
|
||||
*******************************************************************************/
|
||||
public static ProcessLock create(String key, String typeName, String details) throws UnableToObtainProcessLockException, QException
|
||||
{
|
||||
Map<String, ProcessLockOrException> locks = createMany(List.of(key), typeName, details);
|
||||
return getProcessLockOrThrow(key, locks);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** try to create a process lock, of a given key & type - and re-try if it failed.
|
||||
** (e.g., wait until existing lock holder releases the lock).
|
||||
**
|
||||
** @param key along with typeName, part of Unique Key for the lock.
|
||||
** @param typeName along with key, part of Unique Key for the lock. Must be a
|
||||
* defined lock type, from which we derive defaultExpirationSeconds.
|
||||
** @param details advice to show users re: who/what created the lock.
|
||||
** @param sleepBetweenTries how long to sleep between retries.
|
||||
** @param maxWait max amount of that will be waited between call to this method
|
||||
* and an eventual UnableToObtainProcessLockException (plus or minus
|
||||
* one sleepBetweenTries (actually probably just plus that).
|
||||
**
|
||||
*******************************************************************************/
|
||||
public static ProcessLock create(String key, String typeName, String details, Duration sleepBetweenTries, Duration maxWait) throws UnableToObtainProcessLockException, QException
|
||||
{
|
||||
Map<String, ProcessLockOrException> locks = createMany(List.of(key), typeName, details, sleepBetweenTries, maxWait);
|
||||
return getProcessLockOrThrow(key, locks);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/***************************************************************************
|
||||
** For the single-lock versions of create, either return the lock identified by
|
||||
** key, or throw.
|
||||
***************************************************************************/
|
||||
private static ProcessLock getProcessLockOrThrow(String key, Map<String, ProcessLockOrException> locks) throws UnableToObtainProcessLockException
|
||||
{
|
||||
if(locks.get(key) != null && locks.get(key).processLock() != null)
|
||||
{
|
||||
return (locks.get(key).processLock());
|
||||
}
|
||||
else if(locks.get(key) != null && locks.get(key).unableToObtainProcessLockException() != null)
|
||||
{
|
||||
throw (locks.get(key).unableToObtainProcessLockException());
|
||||
}
|
||||
else
|
||||
{
|
||||
throw (new UnableToObtainProcessLockException("Missing key [" + key + "] in response from request to create lock. Lock not created."));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** try to create many process locks, of list of keys & a type - but immediately
|
||||
** fail (on a one-by-one basis) if the lock already exists.
|
||||
**
|
||||
** @param keys along with typeName, part of Unique Key for the lock.
|
||||
** @param typeName along with key, part of Unique Key for the lock. Must be a
|
||||
* defined lock type, from which we derive defaultExpirationSeconds.
|
||||
** @param details advice to show users re: who/what created the lock.
|
||||
*******************************************************************************/
|
||||
public static Map<String, ProcessLockOrException> createMany(List<String> keys, String typeName, String details) throws QException
|
||||
{
|
||||
Map<String, ProcessLockOrException> rs = new HashMap<>();
|
||||
|
||||
ProcessLockType lockType = getProcessLockTypeByName(typeName);
|
||||
if(lockType == null)
|
||||
{
|
||||
@ -82,137 +165,305 @@ public class ProcessLockUtils
|
||||
|
||||
QSession qSession = QContext.getQSession();
|
||||
|
||||
Instant now = Instant.now();
|
||||
ProcessLock processLock = new ProcessLock()
|
||||
.withKey(key)
|
||||
.withProcessLockTypeId(lockType.getId())
|
||||
.withSessionUUID(ObjectUtils.tryAndRequireNonNullElse(() -> qSession.getUuid(), null))
|
||||
.withUserId(ObjectUtils.tryAndRequireNonNullElse(() -> qSession.getUser().getIdReference(), null))
|
||||
.withDetails(details)
|
||||
.withCheckInTimestamp(now);
|
||||
Instant now = Instant.now();
|
||||
Integer defaultExpirationSeconds = lockType.getDefaultExpirationSeconds();
|
||||
List<ProcessLock> processLocksToInsert = new ArrayList<>();
|
||||
|
||||
Integer defaultExpirationSeconds = lockType.getDefaultExpirationSeconds();
|
||||
if(defaultExpirationSeconds != null)
|
||||
Function<String, ProcessLock> constructProcessLockFromKey = (key) ->
|
||||
{
|
||||
processLock.setExpiresAtTimestamp(now.plusSeconds(defaultExpirationSeconds));
|
||||
ProcessLock processLock = new ProcessLock()
|
||||
.withKey(key)
|
||||
.withProcessLockTypeId(lockType.getId())
|
||||
.withSessionUUID(ObjectUtils.tryAndRequireNonNullElse(() -> qSession.getUuid(), null))
|
||||
.withUserId(ObjectUtils.tryAndRequireNonNullElse(() -> qSession.getUser().getIdReference(), null))
|
||||
.withDetails(details)
|
||||
.withCheckInTimestamp(now);
|
||||
|
||||
if(defaultExpirationSeconds != null)
|
||||
{
|
||||
processLock.setExpiresAtTimestamp(now.plusSeconds(defaultExpirationSeconds));
|
||||
}
|
||||
|
||||
return (processLock);
|
||||
};
|
||||
|
||||
for(String key : keys)
|
||||
{
|
||||
processLocksToInsert.add(constructProcessLockFromKey.apply(key));
|
||||
}
|
||||
|
||||
QRecord insertOutputRecord = tryToInsert(processLock);
|
||||
Map<String, ProcessLockOrException> insertResultMap = tryToInsertMany(processLocksToInsert);
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
// if inserting failed... see if we can get existing lock //
|
||||
////////////////////////////////////////////////////////////
|
||||
StringBuilder existingLockDetails = new StringBuilder();
|
||||
ProcessLock existingLock = null;
|
||||
if(CollectionUtils.nullSafeHasContents(insertOutputRecord.getErrors()))
|
||||
////////////////////////////////////////
|
||||
// look at which (if any) keys failed //
|
||||
////////////////////////////////////////
|
||||
Set<String> failedKeys = new HashSet<>();
|
||||
for(Map.Entry<String, ProcessLockOrException> entry : insertResultMap.entrySet())
|
||||
{
|
||||
QRecord existingLockRecord = new GetAction().executeForRecord(new GetInput(ProcessLock.TABLE_NAME).withUniqueKey(Map.of("key", key, "processLockTypeId", lockType.getId())));
|
||||
if(existingLockRecord != null)
|
||||
if(entry.getValue().unableToObtainProcessLockException() != null)
|
||||
{
|
||||
existingLock = new ProcessLock(existingLockRecord);
|
||||
if(StringUtils.hasContent(existingLock.getUserId()))
|
||||
{
|
||||
existingLockDetails.append("Held by: ").append(existingLock.getUserId());
|
||||
}
|
||||
failedKeys.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
|
||||
if(StringUtils.hasContent(existingLock.getDetails()))
|
||||
{
|
||||
existingLockDetails.append("; with details: ").append(existingLock.getDetails());
|
||||
}
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
// if any keys failed, try to get the existing locks for those keys //
|
||||
//////////////////////////////////////////////////////////////////////
|
||||
Map<String, QRecord> existingLockRecords = new HashMap<>();
|
||||
if(CollectionUtils.nullSafeHasContents(failedKeys))
|
||||
{
|
||||
QueryOutput queryOutput = new QueryAction().execute(new QueryInput(ProcessLock.TABLE_NAME).withFilter(new QQueryFilter()
|
||||
.withCriteria("processLockTypeId", QCriteriaOperator.EQUALS, lockType.getId())
|
||||
.withCriteria("key", QCriteriaOperator.IN, failedKeys)));
|
||||
for(QRecord record : queryOutput.getRecords())
|
||||
{
|
||||
existingLockRecords.put(record.getValueString("key"), record);
|
||||
}
|
||||
}
|
||||
|
||||
Instant expiresAtTimestamp = existingLock.getExpiresAtTimestamp();
|
||||
if(expiresAtTimestamp != null)
|
||||
{
|
||||
ZonedDateTime zonedExpiresAt = expiresAtTimestamp.atZone(ValueUtils.getSessionOrInstanceZoneId());
|
||||
existingLockDetails.append("; expiring at: ").append(QValueFormatter.formatDateTimeWithZone(zonedExpiresAt));
|
||||
}
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// loop over results from insert call - either adding successes to the output structure, or adding details about failures, //
|
||||
// OR - deleting expired locks and trying a second insert! //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
List<Serializable> deleteIdList = new ArrayList<>();
|
||||
List<ProcessLock> tryAgainList = new ArrayList<>();
|
||||
Map<String, String> existingLockDetailsMap = new HashMap<>();
|
||||
Map<String, ProcessLock> existingLockMap = new HashMap<>();
|
||||
for(Map.Entry<String, ProcessLockOrException> entry : insertResultMap.entrySet())
|
||||
{
|
||||
String key = entry.getKey();
|
||||
ProcessLock processLock = entry.getValue().processLock();
|
||||
|
||||
if(expiresAtTimestamp != null && expiresAtTimestamp.isBefore(now))
|
||||
{
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
// if existing lock has expired, then we can delete it and try to insert again //
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
LOG.info("Existing lock has expired - deleting it and trying again.", logPair("id", existingLock.getId()),
|
||||
logPair("key", key), logPair("type", typeName), logPair("details", details), logPair("expiresAtTimestamp", expiresAtTimestamp));
|
||||
new DeleteAction().execute(new DeleteInput(ProcessLock.TABLE_NAME).withPrimaryKey(existingLock.getId()));
|
||||
insertOutputRecord = tryToInsert(processLock);
|
||||
}
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
// if inserting failed... see if we found an existing lock for this key //
|
||||
//////////////////////////////////////////////////////////////////////////
|
||||
StringBuilder existingLockDetails = new StringBuilder();
|
||||
ProcessLock existingLock = null;
|
||||
|
||||
if(processLock != null)
|
||||
{
|
||||
rs.put(key, new ProcessLockOrException(processLock));
|
||||
}
|
||||
else
|
||||
{
|
||||
/////////////////////////////////////////////////////////
|
||||
// if existing lock doesn't exist, try to insert again //
|
||||
/////////////////////////////////////////////////////////
|
||||
insertOutputRecord = tryToInsert(processLock);
|
||||
QRecord existingLockRecord = existingLockRecords.get(key);
|
||||
if(existingLockRecord != null)
|
||||
{
|
||||
existingLock = new ProcessLock(existingLockRecord);
|
||||
if(StringUtils.hasContent(existingLock.getUserId()))
|
||||
{
|
||||
existingLockDetails.append("Held by: ").append(existingLock.getUserId());
|
||||
}
|
||||
|
||||
if(StringUtils.hasContent(existingLock.getDetails()))
|
||||
{
|
||||
existingLockDetails.append("; with details: ").append(existingLock.getDetails());
|
||||
}
|
||||
|
||||
Instant expiresAtTimestamp = existingLock.getExpiresAtTimestamp();
|
||||
if(expiresAtTimestamp != null)
|
||||
{
|
||||
ZonedDateTime zonedExpiresAt = expiresAtTimestamp.atZone(ValueUtils.getSessionOrInstanceZoneId());
|
||||
existingLockDetails.append("; expiring at: ").append(QValueFormatter.formatDateTimeWithZone(zonedExpiresAt));
|
||||
}
|
||||
|
||||
existingLockDetailsMap.put(key, existingLockDetails.toString());
|
||||
existingLockMap.put(key, existingLock);
|
||||
|
||||
if(expiresAtTimestamp != null && expiresAtTimestamp.isBefore(now))
|
||||
{
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
// if existing lock has expired, then we can delete it and try to insert again //
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
LOG.info("Existing lock has expired - deleting it and trying again.", logPair("id", existingLock.getId()),
|
||||
logPair("key", key), logPair("type", typeName), logPair("details", details), logPair("expiresAtTimestamp", expiresAtTimestamp));
|
||||
deleteIdList.add(existingLock.getId());
|
||||
tryAgainList.add(constructProcessLockFromKey.apply(key));
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
// if existing lock doesn't exist now (e.g., it was deleted before the UC //
|
||||
// check failed and when we looked for it), then just try to insert it again //
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
tryAgainList.add(constructProcessLockFromKey.apply(key));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if(CollectionUtils.nullSafeHasContents(insertOutputRecord.getErrors()))
|
||||
/////////////////////////////////////////////////////
|
||||
// if there are expired locks to delete, do so now //
|
||||
/////////////////////////////////////////////////////
|
||||
if(!deleteIdList.isEmpty())
|
||||
{
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
// if at this point, we have errors on the last attempted insert, then give up //
|
||||
/////////////////////////////////////////////////////////////////////////////////
|
||||
LOG.info("Errors in process lock record after attempted insert", logPair("errors", insertOutputRecord.getErrors()),
|
||||
logPair("key", key), logPair("type", typeName), logPair("details", details));
|
||||
throw (new UnableToObtainProcessLockException("A Process Lock already exists for key [" + key + "] of type [" + typeName + "], " + existingLockDetails)
|
||||
.withExistingLock(existingLock));
|
||||
new DeleteAction().execute(new DeleteInput(ProcessLock.TABLE_NAME).withPrimaryKeys(deleteIdList));
|
||||
}
|
||||
|
||||
LOG.info("Created process lock", logPair("id", processLock.getId()),
|
||||
logPair("key", key), logPair("type", typeName), logPair("details", details), logPair("expiresAtTimestamp", processLock.getExpiresAtTimestamp()));
|
||||
return new ProcessLock(insertOutputRecord);
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// if there are any to try again (either because we just deleted their now-expired locks, or because we otherwise couldn't find their locks, do so now //
|
||||
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
if(!tryAgainList.isEmpty())
|
||||
{
|
||||
Map<String, ProcessLockOrException> tryAgainResult = tryToInsertMany(tryAgainList);
|
||||
for(Map.Entry<String, ProcessLockOrException> entry : tryAgainResult.entrySet())
|
||||
{
|
||||
String key = entry.getKey();
|
||||
ProcessLock processLock = entry.getValue().processLock();
|
||||
UnableToObtainProcessLockException unableToObtainProcessLockException = entry.getValue().unableToObtainProcessLockException();
|
||||
|
||||
if(processLock != null)
|
||||
{
|
||||
rs.put(key, new ProcessLockOrException(processLock));
|
||||
}
|
||||
else
|
||||
{
|
||||
rs.put(key, new ProcessLockOrException(Objects.requireNonNullElseGet(unableToObtainProcessLockException, () -> new UnableToObtainProcessLockException("Process lock not created, but no details available."))));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////
|
||||
// put anything not successfully created into result map as error //
|
||||
////////////////////////////////////////////////////////////////////
|
||||
for(ProcessLock processLock : processLocksToInsert)
|
||||
{
|
||||
String key = processLock.getKey();
|
||||
if(rs.containsKey(key))
|
||||
{
|
||||
LOG.info("Created process lock", logPair("id", processLock.getId()),
|
||||
logPair("key", key), logPair("type", typeName), logPair("details", details), logPair("expiresAtTimestamp", processLock.getExpiresAtTimestamp()));
|
||||
}
|
||||
else
|
||||
{
|
||||
if(existingLockDetailsMap.containsKey(key))
|
||||
{
|
||||
rs.put(key, new ProcessLockOrException(new UnableToObtainProcessLockException("A Process Lock already exists for key [" + key + "] of type [" + typeName + "], " + existingLockDetailsMap.get(key))
|
||||
.withExistingLock(existingLockMap.get(key))));
|
||||
}
|
||||
else
|
||||
{
|
||||
rs.put(key, new ProcessLockOrException(new UnableToObtainProcessLockException("Process lock for key [" + key + "] of type [" + typeName + "] was not created...")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (rs);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
** Try to do an insert - noting that an exception from the InsertAction will be
|
||||
** caught in here, and placed in the records as an Error!
|
||||
*******************************************************************************/
|
||||
private static QRecord tryToInsert(ProcessLock processLock) throws QException
|
||||
private static Map<String, ProcessLockOrException> tryToInsertMany(List<ProcessLock> processLocks)
|
||||
{
|
||||
return new InsertAction().execute(new InsertInput(ProcessLock.TABLE_NAME).withRecordEntity(processLock)).getRecords().get(0);
|
||||
Map<String, ProcessLockOrException> rs = new HashMap<>();
|
||||
|
||||
try
|
||||
{
|
||||
List<QRecord> insertedRecords = new InsertAction().execute(new InsertInput(ProcessLock.TABLE_NAME).withRecordEntities(processLocks)).getRecords();
|
||||
for(QRecord insertedRecord : insertedRecords)
|
||||
{
|
||||
String key = insertedRecord.getValueString("key");
|
||||
if(CollectionUtils.nullSafeHasContents(insertedRecord.getErrors()))
|
||||
{
|
||||
rs.put(key, new ProcessLockOrException(new UnableToObtainProcessLockException(insertedRecord.getErrors().get(0).getMessage())));
|
||||
}
|
||||
else
|
||||
{
|
||||
rs.put(key, new ProcessLockOrException(new ProcessLock(insertedRecord)));
|
||||
}
|
||||
}
|
||||
}
|
||||
catch(Exception e)
|
||||
{
|
||||
for(ProcessLock processLock : processLocks)
|
||||
{
|
||||
rs.put(processLock.getKey(), new ProcessLockOrException(new UnableToObtainProcessLockException("Error attempting to insert process lock: " + e.getMessage())));
|
||||
}
|
||||
}
|
||||
|
||||
return (rs);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** try to create many process locks, of a given list of key & a type - and re-try
|
||||
** upon failures (e.g., wait until existing lock holder releases the lock).
|
||||
**
|
||||
** @param keys along with typeName, part of Unique Key for the lock.
|
||||
** @param typeName along with key, part of Unique Key for the lock. Must be a
|
||||
* defined lock type, from which we derive defaultExpirationSeconds.
|
||||
** @param details advice to show users re: who/what created the lock.
|
||||
** @param sleepBetweenTries how long to sleep between retries.
|
||||
** @param maxWait max amount of that will be waited between call to this method
|
||||
* and an eventual UnableToObtainProcessLockException (plus or minus
|
||||
* one sleepBetweenTries (actually probably just plus that).
|
||||
**
|
||||
*******************************************************************************/
|
||||
public static ProcessLock create(String key, String type, String holderId, Duration sleepBetweenTries, Duration maxWait) throws UnableToObtainProcessLockException, QException
|
||||
public static Map<String, ProcessLockOrException> createMany(List<String> keys, String typeName, String details, Duration sleepBetweenTries, Duration maxWait) throws QException
|
||||
{
|
||||
Map<String, ProcessLockOrException> rs = new HashMap<>();
|
||||
Map<String, UnableToObtainProcessLockException> lastExceptionsPerKey = new HashMap<>();
|
||||
Set<String> stillNeedCreated = new HashSet<>(keys);
|
||||
|
||||
Instant giveUpTime = Instant.now().plus(maxWait);
|
||||
|
||||
UnableToObtainProcessLockException lastCaughtUnableToObtainProcessLockException = null;
|
||||
while(true)
|
||||
{
|
||||
try
|
||||
Map<String, ProcessLockOrException> createManyResult = createMany(stillNeedCreated.size() == keys.size() ? keys : new ArrayList<>(stillNeedCreated), typeName, details);
|
||||
for(Map.Entry<String, ProcessLockOrException> entry : createManyResult.entrySet())
|
||||
{
|
||||
ProcessLock processLock = create(key, type, holderId);
|
||||
return (processLock);
|
||||
String key = entry.getKey();
|
||||
ProcessLockOrException processLockOrException = entry.getValue();
|
||||
if(processLockOrException.processLock() != null)
|
||||
{
|
||||
rs.put(key, processLockOrException);
|
||||
stillNeedCreated.remove(key);
|
||||
}
|
||||
else if(processLockOrException.unableToObtainProcessLockException() != null)
|
||||
{
|
||||
lastExceptionsPerKey.put(key, processLockOrException.unableToObtainProcessLockException());
|
||||
}
|
||||
}
|
||||
catch(UnableToObtainProcessLockException e)
|
||||
|
||||
if(stillNeedCreated.isEmpty())
|
||||
{
|
||||
lastCaughtUnableToObtainProcessLockException = e;
|
||||
if(Instant.now().plus(sleepBetweenTries).isBefore(giveUpTime))
|
||||
{
|
||||
SleepUtils.sleep(sleepBetweenTries);
|
||||
}
|
||||
else
|
||||
{
|
||||
break;
|
||||
}
|
||||
//////////////////////////////////////////////////////////
|
||||
// if they've all been created now, great, return them! //
|
||||
//////////////////////////////////////////////////////////
|
||||
return (rs);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
// oops, let's sleep (if we're before the give up time) and then try again //
|
||||
/////////////////////////////////////////////////////////////////////////////
|
||||
if(Instant.now().plus(sleepBetweenTries).isBefore(giveUpTime))
|
||||
{
|
||||
SleepUtils.sleep(sleepBetweenTries);
|
||||
}
|
||||
else
|
||||
{
|
||||
/////////////////////////////////
|
||||
// else, break if out of time! //
|
||||
/////////////////////////////////
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// this variable can never be null with current code-path, but prefer to be defensive regardless //
|
||||
///////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
@SuppressWarnings("ConstantValue")
|
||||
String suffix = lastCaughtUnableToObtainProcessLockException == null ? "" : ": " + lastCaughtUnableToObtainProcessLockException.getMessage();
|
||||
////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// any that didn't get created, they need their last error (or a new error) put in the rs //
|
||||
////////////////////////////////////////////////////////////////////////////////////////////
|
||||
for(String key : stillNeedCreated)
|
||||
{
|
||||
rs.put(key, new ProcessLockOrException(lastExceptionsPerKey.getOrDefault(key, new UnableToObtainProcessLockException("Missing key [" + key + "] in response from request to create lock. Lock not created."))));
|
||||
}
|
||||
|
||||
//noinspection ConstantValue
|
||||
throw (new UnableToObtainProcessLockException("Unable to obtain process lock for key [" + key + "] in type [" + type + "] after [" + maxWait + "]" + suffix)
|
||||
.withExistingLock(lastCaughtUnableToObtainProcessLockException == null ? null : lastCaughtUnableToObtainProcessLockException.getExistingLock()));
|
||||
return (rs);
|
||||
}
|
||||
|
||||
|
||||
@ -389,27 +640,41 @@ public class ProcessLockUtils
|
||||
{
|
||||
if(id == null)
|
||||
{
|
||||
LOG.debug("No id passed in to releaseById - returning with noop");
|
||||
LOG.debug("No ids passed in to releaseById - returning with noop");
|
||||
return;
|
||||
}
|
||||
|
||||
releaseByIds(List.of(id));
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
public static void releaseByIds(List<? extends Serializable> ids)
|
||||
{
|
||||
List<Serializable> nonNullIds = ids == null ? Collections.emptyList() : ids.stream().filter(Objects::nonNull).map(o -> (Serializable) o).toList();
|
||||
|
||||
if(CollectionUtils.nullSafeIsEmpty(nonNullIds))
|
||||
{
|
||||
LOG.debug("No ids passed in to releaseById - returning with noop");
|
||||
return;
|
||||
}
|
||||
|
||||
ProcessLock processLock = null;
|
||||
try
|
||||
{
|
||||
processLock = ProcessLockUtils.getById(id);
|
||||
if(processLock == null)
|
||||
DeleteOutput deleteOutput = new DeleteAction().execute(new DeleteInput(ProcessLock.TABLE_NAME).withPrimaryKeys(nonNullIds));
|
||||
if(CollectionUtils.nullSafeHasContents(deleteOutput.getRecordsWithErrors()))
|
||||
{
|
||||
LOG.info("Process lock not found in releaseById call", logPair("id", id));
|
||||
throw (new QException("Error deleting processLocks: " + deleteOutput.getRecordsWithErrors().get(0).getErrorsAsString()));
|
||||
}
|
||||
|
||||
LOG.info("Released process locks", logPair("ids", nonNullIds));
|
||||
}
|
||||
catch(QException e)
|
||||
{
|
||||
LOG.warn("Exception releasing processLock byId", e, logPair("id", id));
|
||||
}
|
||||
|
||||
if(processLock != null)
|
||||
{
|
||||
release(processLock);
|
||||
LOG.warn("Exception releasing processLocks byId", e, logPair("ids", ids));
|
||||
}
|
||||
}
|
||||
|
||||
@ -420,26 +685,33 @@ public class ProcessLockUtils
|
||||
*******************************************************************************/
|
||||
public static void release(ProcessLock processLock)
|
||||
{
|
||||
try
|
||||
if(processLock == null)
|
||||
{
|
||||
if(processLock == null)
|
||||
{
|
||||
LOG.debug("No process lock passed in to release - returning with noop");
|
||||
return;
|
||||
}
|
||||
|
||||
DeleteOutput deleteOutput = new DeleteAction().execute(new DeleteInput(ProcessLock.TABLE_NAME).withPrimaryKey(processLock.getId()));
|
||||
if(CollectionUtils.nullSafeHasContents(deleteOutput.getRecordsWithErrors()))
|
||||
{
|
||||
throw (new QException("Error deleting processLock record: " + deleteOutput.getRecordsWithErrors().get(0).getErrorsAsString()));
|
||||
}
|
||||
|
||||
LOG.info("Released process lock", logPair("id", processLock.getId()), logPair("key", processLock.getKey()), logPair("typeId", processLock.getProcessLockTypeId()), logPair("details", processLock.getDetails()));
|
||||
LOG.debug("No process lock passed in to release - returning with noop");
|
||||
return;
|
||||
}
|
||||
catch(QException e)
|
||||
|
||||
releaseMany(List.of(processLock));
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
public static void releaseMany(List<ProcessLock> processLocks)
|
||||
{
|
||||
if(CollectionUtils.nullSafeIsEmpty(processLocks))
|
||||
{
|
||||
LOG.warn("Exception releasing processLock", e, logPair("processLockId", () -> processLock.getId()));
|
||||
LOG.debug("No process locks passed in to release - returning with noop");
|
||||
return;
|
||||
}
|
||||
|
||||
List<Serializable> ids = processLocks.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.map(pl -> (Serializable) pl.getId())
|
||||
.toList();
|
||||
releaseByIds(ids);
|
||||
}
|
||||
|
||||
|
||||
|
@ -25,13 +25,17 @@ package com.kingsrook.qqq.backend.core.processes.locks;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import com.kingsrook.qqq.backend.core.BaseTest;
|
||||
import com.kingsrook.qqq.backend.core.actions.tables.InsertAction;
|
||||
import com.kingsrook.qqq.backend.core.context.QContext;
|
||||
import com.kingsrook.qqq.backend.core.exceptions.QException;
|
||||
import com.kingsrook.qqq.backend.core.instances.QInstanceValidator;
|
||||
import com.kingsrook.qqq.backend.core.logging.QCollectingLogger;
|
||||
import com.kingsrook.qqq.backend.core.logging.QLogger;
|
||||
import com.kingsrook.qqq.backend.core.model.actions.tables.insert.InsertInput;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.MetaDataProducerMultiOutput;
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
|
||||
@ -39,6 +43,7 @@ import com.kingsrook.qqq.backend.core.model.metadata.tables.QTableMetaData;
|
||||
import com.kingsrook.qqq.backend.core.model.session.QUser;
|
||||
import com.kingsrook.qqq.backend.core.utils.SleepUtils;
|
||||
import com.kingsrook.qqq.backend.core.utils.TestUtils;
|
||||
import com.kingsrook.qqq.backend.core.utils.collections.ListBuilder;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
@ -227,6 +232,28 @@ class ProcessLockUtilsTest extends BaseTest
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Test
|
||||
void testReleaseBadInputs()
|
||||
{
|
||||
//////////////////////////////////////////////////////////
|
||||
// make sure we don't blow up, just noop in these cases //
|
||||
//////////////////////////////////////////////////////////
|
||||
QCollectingLogger qCollectingLogger = QLogger.activateCollectingLoggerForClass(ProcessLockUtils.class);
|
||||
ProcessLockUtils.releaseById(null);
|
||||
ProcessLockUtils.release(null);
|
||||
ProcessLockUtils.releaseMany(null);
|
||||
ProcessLockUtils.releaseByIds(null);
|
||||
ProcessLockUtils.releaseMany(ListBuilder.of(null));
|
||||
ProcessLockUtils.releaseByIds(ListBuilder.of(null));
|
||||
QLogger.deactivateCollectingLoggerForClass(ProcessLockUtils.class);
|
||||
assertEquals(6, qCollectingLogger.getCollectedMessages().stream().filter(m -> m.getMessage().contains("noop")).count());
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@ -304,7 +331,7 @@ class ProcessLockUtilsTest extends BaseTest
|
||||
//////////////////////////////////////////////
|
||||
// checkin w/ a time - sets it to that time //
|
||||
//////////////////////////////////////////////
|
||||
Instant specifiedTime = Instant.now();
|
||||
Instant specifiedTime = Instant.now().plusSeconds(47);
|
||||
ProcessLockUtils.checkIn(processLock, specifiedTime);
|
||||
processLock = ProcessLockUtils.getById(processLock.getId());
|
||||
assertEquals(specifiedTime, processLock.getExpiresAtTimestamp());
|
||||
@ -380,4 +407,122 @@ class ProcessLockUtilsTest extends BaseTest
|
||||
assertNull(processLock.getExpiresAtTimestamp());
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Test
|
||||
void testMany() throws QException
|
||||
{
|
||||
/////////////////////////////////////////////////
|
||||
// make sure that we can create multiple locks //
|
||||
/////////////////////////////////////////////////
|
||||
List<String> keys = List.of("1", "2", "3");
|
||||
List<ProcessLock> processLocks = new ArrayList<>();
|
||||
Map<String, ProcessLockOrException> results = ProcessLockUtils.createMany(keys, "typeA", "me");
|
||||
for(String key : keys)
|
||||
{
|
||||
ProcessLock processLock = results.get(key).processLock();
|
||||
assertNotNull(processLock.getId());
|
||||
assertNotNull(processLock.getCheckInTimestamp());
|
||||
assertNull(processLock.getExpiresAtTimestamp());
|
||||
processLocks.add(processLock);
|
||||
}
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// make sure we can't create a second for the same key //
|
||||
/////////////////////////////////////////////////////////
|
||||
assertThatThrownBy(() -> ProcessLockUtils.create("1", "typeA", "you"))
|
||||
.isInstanceOf(UnableToObtainProcessLockException.class)
|
||||
.hasMessageContaining("Held by: " + QContext.getQSession().getUser().getIdReference())
|
||||
.hasMessageContaining("with details: me")
|
||||
.hasMessageNotContaining("expiring at: 20")
|
||||
.matches(e -> ((UnableToObtainProcessLockException) e).getExistingLock() != null);
|
||||
|
||||
/////////////////////////////////////////////////////////
|
||||
// make sure we can create another for a different key //
|
||||
/////////////////////////////////////////////////////////
|
||||
ProcessLockUtils.create("4", "typeA", "him");
|
||||
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
// make sure we can create another for a different type (same key) //
|
||||
/////////////////////////////////////////////////////////////////////
|
||||
ProcessLockUtils.create("1", "typeB", "her");
|
||||
|
||||
////////////////////////////////////////////////////////////////////
|
||||
// now try to create some that will overlap, but one that'll work //
|
||||
////////////////////////////////////////////////////////////////////
|
||||
keys = List.of("3", "4", "5");
|
||||
results = ProcessLockUtils.createMany(keys, "typeA", "me");
|
||||
for(String key : List.of("3", "4"))
|
||||
{
|
||||
UnableToObtainProcessLockException exception = results.get(key).unableToObtainProcessLockException();
|
||||
assertNotNull(exception);
|
||||
}
|
||||
|
||||
ProcessLock processLock = results.get("5").processLock();
|
||||
assertNotNull(processLock.getId());
|
||||
assertNotNull(processLock.getCheckInTimestamp());
|
||||
assertNull(processLock.getExpiresAtTimestamp());
|
||||
processLocks.add(processLock);
|
||||
|
||||
//////////////////////////////
|
||||
// make sure we can release //
|
||||
//////////////////////////////
|
||||
ProcessLockUtils.releaseMany(processLocks);
|
||||
|
||||
///////////////////////////////////////////////////////
|
||||
// make sure can re-lock 1 now after it was released //
|
||||
///////////////////////////////////////////////////////
|
||||
processLock = ProcessLockUtils.create("1", "typeA", "you");
|
||||
assertNotNull(processLock.getId());
|
||||
assertEquals("you", processLock.getDetails());
|
||||
}
|
||||
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
**
|
||||
*******************************************************************************/
|
||||
@Test
|
||||
void testManyWithSleep() throws QException
|
||||
{
|
||||
/////////////////////////////////////////////////
|
||||
// make sure that we can create multiple locks //
|
||||
/////////////////////////////////////////////////
|
||||
List<String> keys = List.of("1", "2", "3");
|
||||
Map<String, ProcessLockOrException> results0 = ProcessLockUtils.createMany(keys, "typeB", "me");
|
||||
for(String key : keys)
|
||||
{
|
||||
assertNotNull(results0.get(key).processLock());
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////
|
||||
// try again - and 2 and 3 should fail, if we don't sleep //
|
||||
////////////////////////////////////////////////////////////
|
||||
keys = List.of("2", "3", "4");
|
||||
Map<String, ProcessLockOrException> results1 = ProcessLockUtils.createMany(keys, "typeB", "you");
|
||||
assertNull(results1.get("2").processLock());
|
||||
assertNull(results1.get("3").processLock());
|
||||
assertNotNull(results1.get("4").processLock());
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// try another insert, which should initially succeed for #5, then sleep, and eventually succeed on 3 & 4 as well //
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
keys = List.of("3", "4", "5");
|
||||
Map<String, ProcessLockOrException> results2 = ProcessLockUtils.createMany(keys, "typeB", "them", Duration.of(1, ChronoUnit.SECONDS), Duration.of(3, ChronoUnit.SECONDS));
|
||||
for(String key : keys)
|
||||
{
|
||||
assertNotNull(results2.get(key).processLock());
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
// make sure that we have a different ids for some that expired and then succeeded post-sleep //
|
||||
////////////////////////////////////////////////////////////////////////////////////////////////
|
||||
assertNotEquals(results0.get("3").processLock().getId(), results2.get("3").processLock().getId());
|
||||
assertNotEquals(results1.get("4").processLock().getId(), results2.get("4").processLock().getId());
|
||||
|
||||
}
|
||||
|
||||
}
|
Reference in New Issue
Block a user