Initial version of ETL action

This commit is contained in:
Darin Kelkhoff
2022-04-12 13:21:54 -05:00
parent f58a59438b
commit d7c7d46122
5 changed files with 366 additions and 0 deletions

View File

@ -0,0 +1,58 @@
/*
* Copyright © 2021-2022. Kingsrook LLC <contact@kingsrook.com>. All Rights Reserved.
*/
package com.kingsrook.qqq.backend.core.actions.etl;
import java.util.List;
import com.kingsrook.qqq.backend.core.actions.InsertAction;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.actions.insert.InsertRequest;
import com.kingsrook.qqq.backend.core.model.actions.insert.InsertResult;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.etl.QDataBatch;
import com.kingsrook.qqq.backend.core.model.etl.QDataSource;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.model.metadata.QTableMetaData;
import com.kingsrook.qqq.backend.core.model.session.QSession;
import com.kingsrook.qqq.backend.core.utils.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/*******************************************************************************
**
*******************************************************************************/
public class RunETL
{
private static final Logger LOG = LogManager.getLogger(RunETL.class);
/*******************************************************************************
**
*******************************************************************************/
public void run(QInstance instance, QSession session, QDataSource source, QTableMetaData destination) throws QException
{
List<String> batchIdentifiers = source.listAvailableBatches();
if(CollectionUtils.nullSafeHasContents(batchIdentifiers))
{
for(String identifier : batchIdentifiers)
{
QDataBatch batch = source.getBatch(identifier, destination);
InsertRequest insertRequest = new InsertRequest(instance);
insertRequest.setTableName(destination.getName());
insertRequest.setSession(session);
insertRequest.setRecords(batch.getRecords());
InsertAction insertAction = new InsertAction();
InsertResult insertResult = insertAction.execute(insertRequest);
System.out.println("** Inserted [" + insertResult.getRecords().size() + "] records into table [" + destination.getName() + "].");
for(QRecord record : insertRequest.getRecords())
{
System.out.println(" Inserted [" + record.getValueString("firstName") + "][" + record.getValueString("lastName") + "].");
}
source.discardBatch(batch);
}
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright © 2021-2022. Kingsrook LLC <contact@kingsrook.com>. All Rights Reserved.
*/
package com.kingsrook.qqq.backend.core.model.etl;
import java.util.List;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
/*******************************************************************************
**
*******************************************************************************/
public class QDataBatch
{
private String identity; // e.g., a full path to a file
private List<QRecord> records;
/*******************************************************************************
** Getter for identity
**
*******************************************************************************/
public String getIdentity()
{
return identity;
}
/*******************************************************************************
** Setter for identity
**
*******************************************************************************/
public void setIdentity(String identity)
{
this.identity = identity;
}
/*******************************************************************************
** Fluent setter for identity
**
*******************************************************************************/
public QDataBatch withIdentity(String identity)
{
this.identity = identity;
return (this);
}
/*******************************************************************************
** Getter for records
**
*******************************************************************************/
public List<QRecord> getRecords()
{
return records;
}
/*******************************************************************************
** Setter for records
**
*******************************************************************************/
public void setRecords(List<QRecord> records)
{
this.records = records;
}
/*******************************************************************************
** Setter for records
**
*******************************************************************************/
public QDataBatch withRecords(List<QRecord> records)
{
this.records = records;
return (this);
}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright © 2021-2022. Kingsrook LLC <contact@kingsrook.com>. All Rights Reserved.
*/
package com.kingsrook.qqq.backend.core.model.etl;
import java.util.List;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.metadata.QTableMetaData;
/*******************************************************************************
**
*******************************************************************************/
public interface QDataSource
{
List<String> listAvailableBatches();
QDataBatch getBatch(String identity, QTableMetaData destination) throws QException;
void discardBatch(QDataBatch batch);
}

View File

@ -0,0 +1,165 @@
/*
* Copyright © 2021-2022. Kingsrook LLC <contact@kingsrook.com>. All Rights Reserved.
*/
package com.kingsrook.qqq.backend.core.model.etl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import com.kingsrook.qqq.backend.core.adapters.CsvToQRecordAdapter;
import com.kingsrook.qqq.backend.core.exceptions.QException;
import com.kingsrook.qqq.backend.core.model.data.QRecord;
import com.kingsrook.qqq.backend.core.model.metadata.QTableMetaData;
import org.apache.commons.io.FileUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.oro.io.GlobFilenameFilter;
/*******************************************************************************
**
*******************************************************************************/
public class QFileSystemDataSource implements QDataSource
{
private static final Logger LOG = LogManager.getLogger(QFileSystemDataSource.class);
private String path;
private String glob;
/*******************************************************************************
** Getter for path
**
*******************************************************************************/
public String getPath()
{
return path;
}
/*******************************************************************************
** Setter for path
**
*******************************************************************************/
public void setPath(String path)
{
this.path = path;
}
/*******************************************************************************
** Fluent setter for path
**
*******************************************************************************/
public QFileSystemDataSource withPath(String path)
{
this.path = path;
return this;
}
/*******************************************************************************
** Getter for glob
**
*******************************************************************************/
public String getGlob()
{
return glob;
}
/*******************************************************************************
** Setter for glob
**
*******************************************************************************/
public void setGlob(String glob)
{
this.glob = glob;
}
/*******************************************************************************
** Fluent setter for glob
**
*******************************************************************************/
public QFileSystemDataSource withGlob(String glob)
{
this.glob = glob;
return this;
}
@Override
public List<String> listAvailableBatches()
{
List<String> rs = new ArrayList<>();
File directory = new File(path);
System.out.println("Listing available batches at [" + path + "].");
for(String entry : Objects.requireNonNull(directory.list(new GlobFilenameFilter(glob))))
{
String entryPath = directory + File.separator + entry;
if(new File(entryPath).isFile())
{
rs.add(entryPath);
}
}
System.out.println("Found [" + rs.size() + "] batches.");
return (rs);
}
@Override
public QDataBatch getBatch(String identity, QTableMetaData table) throws QException
{
File file = new File(identity);
if(!file.exists())
{
throw new QException("File [" + identity + "] was not found.");
}
if(!file.isFile())
{
throw new QException("File [" + identity + "] is not a regular file.");
}
try
{
System.out.println("Reading batch file [" + identity + "].");
String contents = FileUtils.readFileToString(file);
List<QRecord> qRecords = new CsvToQRecordAdapter().buildRecordsFromCsv(contents, table, null);// todo!!
System.out.println("Read [" + qRecords.size() + "] records from batch file.");
return (new QDataBatch().withIdentity(identity).withRecords(qRecords));
}
catch(IOException e)
{
throw new QException("Error reading file", e);
}
}
@Override
public void discardBatch(QDataBatch batch)
{
File file = new File(batch.getIdentity());
File trashFile = new File("/tmp/" + UUID.randomUUID());
if(file.renameTo(trashFile))
{
System.out.println("Discard batch file [" + batch.getIdentity() + "] to trash file [" + trashFile + "].");
}
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright © 2021-2022. Kingsrook LLC <contact@kingsrook.com>. All Rights Reserved.
*/
package com.kingsrook.qqq.backend.core.actions.etl;
import com.kingsrook.qqq.backend.core.model.etl.QDataSource;
import com.kingsrook.qqq.backend.core.model.etl.QFileSystemDataSource;
import com.kingsrook.qqq.backend.core.model.metadata.QInstance;
import com.kingsrook.qqq.backend.core.utils.TestUtils;
import org.junit.jupiter.api.Test;
/*******************************************************************************
**
*******************************************************************************/
class RunETLTest
{
@Test
public void testRun() throws Exception
{
RunETL runETL = new RunETL();
QDataSource dataSource = new QFileSystemDataSource()
.withPath("/tmp/etl-source")
.withGlob("*.csv");
QInstance qInstance = TestUtils.defineInstance();
runETL.run(qInstance, TestUtils.getMockSession(), dataSource, qInstance.getTable("person"));
}
}