mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 13:10:44 +00:00
QQQ-14 checkpoint, pre-demo
This commit is contained in:
2
pom.xml
2
pom.xml
@ -51,7 +51,7 @@
|
||||
<dependency>
|
||||
<groupId>com.kingsrook.qqq</groupId>
|
||||
<artifactId>qqq-backend-core</artifactId>
|
||||
<version>0.0.0-20220628.161829-14</version>
|
||||
<version>0.0.0-20220629.151616-15</version>
|
||||
</dependency>
|
||||
|
||||
<!-- 3rd party deps specifically for this module -->
|
||||
|
@ -81,7 +81,7 @@ public abstract class AbstractBaseFilesystemAction<FILE>
|
||||
/*******************************************************************************
|
||||
** Get a string that represents the full path to a file.
|
||||
*******************************************************************************/
|
||||
protected abstract String getFullPathForFile(FILE file);
|
||||
public abstract String getFullPathForFile(FILE file);
|
||||
|
||||
/*******************************************************************************
|
||||
** In contrast with the DeleteAction, which deletes RECORDS - this is a
|
||||
@ -105,7 +105,7 @@ public abstract class AbstractBaseFilesystemAction<FILE>
|
||||
** and a file at /foo/bar/baz.txt
|
||||
** give us just the baz.txt part.
|
||||
*******************************************************************************/
|
||||
public abstract String stripBackendAndTableBasePathsFromFileName(FILE file, QBackendMetaData sourceBackend, QTableMetaData sourceTable);
|
||||
public abstract String stripBackendAndTableBasePathsFromFileName(String filePath, QBackendMetaData sourceBackend, QTableMetaData sourceTable);
|
||||
|
||||
|
||||
|
||||
|
@ -95,7 +95,7 @@ public class AbstractFilesystemAction extends AbstractBaseFilesystemAction<File>
|
||||
** Get a string that represents the full path to a file.
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
protected String getFullPathForFile(File file)
|
||||
public String getFullPathForFile(File file)
|
||||
{
|
||||
return (file.getAbsolutePath());
|
||||
}
|
||||
@ -176,10 +176,10 @@ public class AbstractFilesystemAction extends AbstractBaseFilesystemAction<File>
|
||||
** give us just the baz.txt part.
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
public String stripBackendAndTableBasePathsFromFileName(File file, QBackendMetaData backend, QTableMetaData table)
|
||||
public String stripBackendAndTableBasePathsFromFileName(String filePath, QBackendMetaData backend, QTableMetaData table)
|
||||
{
|
||||
String tablePath = getFullBasePath(table, backend);
|
||||
String strippedPath = file.getAbsolutePath().replaceFirst(".*" + tablePath, "");
|
||||
String strippedPath = filePath.replaceFirst(".*" + tablePath, "");
|
||||
return (strippedPath);
|
||||
}
|
||||
|
||||
|
@ -41,6 +41,7 @@ import com.kingsrook.qqq.backend.core.modules.interfaces.QBackendModuleInterface
|
||||
import com.kingsrook.qqq.backend.core.processes.implementations.etl.basic.BasicETLProcess;
|
||||
import com.kingsrook.qqq.backend.core.utils.StringUtils;
|
||||
import com.kingsrook.qqq.backend.module.filesystem.base.FilesystemBackendModuleInterface;
|
||||
import com.kingsrook.qqq.backend.module.filesystem.base.actions.AbstractBaseFilesystemAction;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
@ -77,6 +78,8 @@ public class BasicETLCleanupSourceFilesFunction implements FunctionBody
|
||||
{
|
||||
throw (new QException("Backend " + table.getBackendName() + " for table " + sourceTableName + " does not support this function."));
|
||||
}
|
||||
AbstractBaseFilesystemAction actionBase = filesystemModule.getActionBase();
|
||||
actionBase.preAction(backend);
|
||||
|
||||
String sourceFilePaths = runFunctionRequest.getValueString(BasicETLCollectSourceFileNamesFunction.FIELD_SOURCE_FILE_PATHS);
|
||||
if(!StringUtils.hasContent(sourceFilePaths))
|
||||
@ -91,7 +94,7 @@ public class BasicETLCleanupSourceFilesFunction implements FunctionBody
|
||||
String moveOrDelete = runFunctionRequest.getValueString(FIELD_MOVE_OR_DELETE);
|
||||
if(VALUE_DELETE.equals(moveOrDelete))
|
||||
{
|
||||
filesystemModule.getActionBase().deleteFile(runFunctionRequest.getInstance(), table, sourceFile);
|
||||
actionBase.deleteFile(runFunctionRequest.getInstance(), table, sourceFile);
|
||||
}
|
||||
else if(VALUE_MOVE.equals(moveOrDelete))
|
||||
{
|
||||
@ -100,9 +103,9 @@ public class BasicETLCleanupSourceFilesFunction implements FunctionBody
|
||||
{
|
||||
throw (new QException("Field [" + FIELD_DESTINATION_FOR_MOVES + "] is missing a value."));
|
||||
}
|
||||
File file = new File(sourceFile);
|
||||
String destinationPath = destinationForMoves + File.separator + file.getName();
|
||||
filesystemModule.getActionBase().moveFile(runFunctionRequest.getInstance(), table, sourceFile, destinationPath);
|
||||
String filePathWithoutBase = actionBase.stripBackendAndTableBasePathsFromFileName(sourceFile, backend, table);
|
||||
String destinationPath = destinationForMoves + File.separator + filePathWithoutBase;
|
||||
actionBase.moveFile(runFunctionRequest.getInstance(), table, sourceFile, destinationPath);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -82,6 +82,8 @@ public class FilesystemSyncFunction implements FunctionBody
|
||||
AbstractBaseFilesystemAction processingActionBase = processingModule.getActionBase();
|
||||
processingActionBase.preAction(processingBackend);
|
||||
|
||||
Integer maxFilesToSync = runFunctionRequest.getValueInteger(FilesystemSyncProcess.FIELD_MAX_FILES_TO_ARCHIVE);
|
||||
int syncedFileCount = 0;
|
||||
for(Map.Entry<String, Object> sourceEntry : sourceFiles.entrySet())
|
||||
{
|
||||
try
|
||||
@ -98,6 +100,13 @@ public class FilesystemSyncFunction implements FunctionBody
|
||||
|
||||
String processingPath = processingActionBase.getFullBasePath(processingTable, processingBackend);
|
||||
processingActionBase.writeFile(processingBackend, processingPath + File.separator + sourceFileName, bytes);
|
||||
syncedFileCount++;
|
||||
|
||||
if(maxFilesToSync != null && syncedFileCount >= maxFilesToSync)
|
||||
{
|
||||
LOG.info("Breaking after syncing " + syncedFileCount + " files");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch(Exception e)
|
||||
@ -119,7 +128,7 @@ public class FilesystemSyncFunction implements FunctionBody
|
||||
|
||||
for(Object file : files)
|
||||
{
|
||||
String fileName = actionBase.stripBackendAndTableBasePathsFromFileName(file, backend, table);
|
||||
String fileName = actionBase.stripBackendAndTableBasePathsFromFileName(actionBase.getFullPathForFile(file), backend, table);
|
||||
rs.put(fileName, file);
|
||||
}
|
||||
|
||||
|
@ -41,6 +41,9 @@ import com.kingsrook.qqq.backend.core.model.metadata.processes.QProcessMetaData;
|
||||
** - if any files exist in the source, but not in the archive, then:
|
||||
** - copy the file to both the archive and the processing table.
|
||||
**
|
||||
** The maxFilesToArchive field can be used to only sync up to that many files
|
||||
** (an help an initial sync, if you want to do it in smaller batches)
|
||||
**
|
||||
** The idea being, that the source is read-only, and we want to move files out of
|
||||
** processing after they've been processed - and the archive is what we can have
|
||||
** in-between the two.
|
||||
@ -52,6 +55,7 @@ public class FilesystemSyncProcess
|
||||
public static final String FIELD_SOURCE_TABLE = "sourceTable";
|
||||
public static final String FIELD_ARCHIVE_TABLE = "archiveTable";
|
||||
public static final String FIELD_PROCESSING_TABLE = "processingTable";
|
||||
public static final String FIELD_MAX_FILES_TO_ARCHIVE = "maxFilesToArchive";
|
||||
|
||||
|
||||
|
||||
@ -69,6 +73,7 @@ public class FilesystemSyncProcess
|
||||
.withInputData(new QFunctionInputMetaData()
|
||||
.addField(new QFieldMetaData(FIELD_SOURCE_TABLE, QFieldType.STRING))
|
||||
.addField(new QFieldMetaData(FIELD_ARCHIVE_TABLE, QFieldType.STRING))
|
||||
.addField(new QFieldMetaData(FIELD_MAX_FILES_TO_ARCHIVE, QFieldType.INTEGER).withDefaultValue(Integer.MAX_VALUE))
|
||||
.addField(new QFieldMetaData(FIELD_PROCESSING_TABLE, QFieldType.STRING)));
|
||||
|
||||
return new QProcessMetaData()
|
||||
|
@ -175,7 +175,7 @@ public class AbstractS3Action extends AbstractBaseFilesystemAction<S3ObjectSumma
|
||||
** Get a string that represents the full path to a file.
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
protected String getFullPathForFile(S3ObjectSummary s3ObjectSummary)
|
||||
public String getFullPathForFile(S3ObjectSummary s3ObjectSummary)
|
||||
{
|
||||
return (s3ObjectSummary.getKey());
|
||||
}
|
||||
@ -189,10 +189,10 @@ public class AbstractS3Action extends AbstractBaseFilesystemAction<S3ObjectSumma
|
||||
** give us just the baz.txt part.
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
public String stripBackendAndTableBasePathsFromFileName(S3ObjectSummary file, QBackendMetaData backend, QTableMetaData table)
|
||||
public String stripBackendAndTableBasePathsFromFileName(String filePath, QBackendMetaData backend, QTableMetaData table)
|
||||
{
|
||||
String tablePath = getFullBasePath(table, backend);
|
||||
String strippedPath = file.getKey().replaceFirst(".*" + tablePath, "");
|
||||
String strippedPath = filePath.replaceFirst("^/*" + stripLeadingSlash(tablePath), "");
|
||||
return (strippedPath);
|
||||
}
|
||||
|
||||
@ -229,6 +229,7 @@ public class AbstractS3Action extends AbstractBaseFilesystemAction<S3ObjectSumma
|
||||
{
|
||||
QBackendMetaData backend = instance.getBackend(table.getBackendName());
|
||||
String bucketName = ((S3BackendMetaData) backend).getBucketName();
|
||||
destination = stripLeadingSlash(stripDuplicatedSlashes(destination));
|
||||
|
||||
getS3Utils().moveObject(bucketName, source, destination);
|
||||
}
|
||||
|
@ -22,7 +22,7 @@
|
||||
package com.kingsrook.qqq.backend.module.filesystem.s3.model.metadata;
|
||||
|
||||
|
||||
import com.kingsrook.qqq.backend.core.model.metadata.QSecretReader;
|
||||
import com.kingsrook.qqq.backend.core.instances.QMetaDataVariableInterpreter;
|
||||
import com.kingsrook.qqq.backend.module.filesystem.base.model.metadata.AbstractFilesystemBackendMetaData;
|
||||
import com.kingsrook.qqq.backend.module.filesystem.s3.S3BackendModule;
|
||||
|
||||
@ -193,14 +193,15 @@ public class S3BackendMetaData extends AbstractFilesystemBackendMetaData
|
||||
/*******************************************************************************
|
||||
** Called by the QInstanceEnricher - to do backend-type-specific enrichments.
|
||||
** Original use case is: reading secrets into fields (e.g., passwords).
|
||||
** TODO - migrate to use @InterpretableFields (and complete that impl on core side)
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
public void enrich()
|
||||
{
|
||||
super.enrich();
|
||||
QSecretReader secretReader = new QSecretReader();
|
||||
accessKey = secretReader.readSecret(accessKey);
|
||||
secretKey = secretReader.readSecret(secretKey);
|
||||
QMetaDataVariableInterpreter interpreter = new QMetaDataVariableInterpreter();
|
||||
accessKey = interpreter.interpret(accessKey);
|
||||
secretKey = interpreter.interpret(secretKey);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -175,7 +175,9 @@ public class S3Utils
|
||||
*******************************************************************************/
|
||||
public void writeFile(String bucket, String key, byte[] contents)
|
||||
{
|
||||
getAmazonS3().putObject(bucket, key, new ByteArrayInputStream(contents), new ObjectMetadata());
|
||||
ObjectMetadata objectMetadata = new ObjectMetadata();
|
||||
objectMetadata.setContentLength(contents.length);
|
||||
getAmazonS3().putObject(bucket, key, new ByteArrayInputStream(contents), objectMetadata);
|
||||
}
|
||||
|
||||
|
||||
|
@ -52,6 +52,8 @@ class FilesystemSyncProcessTest
|
||||
@Test
|
||||
public void test() throws Exception
|
||||
{
|
||||
TestUtils.cleanInstanceFiles();
|
||||
|
||||
QTableMetaData sourceTable = defineTable("source");
|
||||
QTableMetaData archiveTable = defineTable("archive");
|
||||
QTableMetaData processingTable = defineTable("processing");
|
||||
@ -61,6 +63,7 @@ class FilesystemSyncProcessTest
|
||||
function.getInputMetaData().getFieldThrowing(FilesystemSyncProcess.FIELD_SOURCE_TABLE).setDefaultValue(sourceTable.getName());
|
||||
function.getInputMetaData().getFieldThrowing(FilesystemSyncProcess.FIELD_ARCHIVE_TABLE).setDefaultValue(archiveTable.getName());
|
||||
function.getInputMetaData().getFieldThrowing(FilesystemSyncProcess.FIELD_PROCESSING_TABLE).setDefaultValue(processingTable.getName());
|
||||
// function.getInputMetaData().getFieldThrowing(FilesystemSyncProcess.FIELD_MAX_FILES_TO_ARCHIVE).setDefaultValue(1);
|
||||
|
||||
QInstance qInstance = TestUtils.defineInstance();
|
||||
qInstance.addTable(sourceTable);
|
||||
@ -74,6 +77,7 @@ class FilesystemSyncProcessTest
|
||||
String basePath = ((FilesystemBackendMetaData) qInstance.getBackend(TestUtils.BACKEND_NAME_LOCAL_FS)).getBasePath();
|
||||
writeTestFile(basePath, sourceTable, "1.txt", "x");
|
||||
writeTestFile(basePath, sourceTable, "2.txt", "x");
|
||||
// writeTestFile(basePath, sourceTable, "3.txt", "x");
|
||||
writeTestFile(basePath, archiveTable, "2.txt", "x");
|
||||
|
||||
//////////////////////
|
||||
|
Reference in New Issue
Block a user