mirror of
https://github.com/Kingsrook/qqq.git
synced 2025-07-18 05:01:07 +00:00
Add TableSyncProcess; remove previously built htmls & pdfs
This commit is contained in:
2104
docs/Reports.pdf
2104
docs/Reports.pdf
File diff suppressed because it is too large
Load Diff
406
docs/implementations/TableSync.adoc
Normal file
406
docs/implementations/TableSync.adoc
Normal file
@ -0,0 +1,406 @@
|
||||
== TableSyncProcess
|
||||
include::../variables.adoc[]
|
||||
|
||||
The `TableSyncProcess` is designed to help solve the common use-case where you have a set of records in one table,
|
||||
you want to apply a transformation to them, and then you want them to result in records in a different table.
|
||||
|
||||
=== Sample Scenario
|
||||
For example, you may be receiving an import file feed from a partner - say, a CSV file of order records
|
||||
- that you need to synchronize into your local database's orders table.
|
||||
|
||||
=== High Level Steps
|
||||
At a high-level, the steps of this task are:
|
||||
|
||||
1. Get the records from the partner's feed.
|
||||
2. Decide Insert/Update - e.g., if you already have any of these orders in your database table, in which case, they need updated, versus ones
|
||||
you don't have, which need inserted.
|
||||
3. Map fields from the partner's order record to your own fields.
|
||||
4. Store the necessary records in your database (insert or update).
|
||||
|
||||
=== What you need to tell QQQ
|
||||
`TableSyncProcess`, as defined by QQQ, knows how to do everything for a job like this, other than the part that's unique to your business.
|
||||
Those unique parts that you need to tell QQQ are:
|
||||
|
||||
* What are the source & destination tables?
|
||||
* What are the criteria to identify source records to be processed?
|
||||
* What are the key fields are that "link" together records from the source & destination tables?
|
||||
** For the example above, imagine you have a "partnerOrderNo" field in your database's order table - then you'd need
|
||||
to tell QQQ what field in the partner's table provides the value for that field in your table.
|
||||
* How do fields / values from the source table map to fields & values in the destination table?
|
||||
|
||||
=== Detailed Steps
|
||||
To get more specific, in a QQQ `TableSyncProcess`, here's what happens for each of the high-level steps given above:
|
||||
|
||||
* Getting records from the partner's feed (done by QQQ):
|
||||
** Records from the source will be fetched via an Extract step, which runs a query to find the records that need processing.
|
||||
** Depending on your use-case, you may use an `ExtractViaQueryStep` (maybe with a Table Automation) or `ExtractViaBasepullQueryStep`
|
||||
(e.g., if you are polling a remote data source).
|
||||
* Deciding Insert/Update (done by QQQ):
|
||||
** Given a set of records from the source table (e.g., output of the Extract step mentioned above), get values from the "key" field in that table.
|
||||
** Do a lookup in the destination table, where its corresponding "key" field has the values extracted from the source records.
|
||||
** For each source record, if its "key" was found in the destination table, then plan an update to that existing corresponding
|
||||
destination record; else, plan to insert a new record in the destination table.
|
||||
* Mapping values (done by your custom Application code):
|
||||
** Specifically, mapping is done in a subclass of `AbstractTableSyncTransformStep`, in the `populateRecordToStore` method.
|
||||
Of particular interest are these two parameters to that method:
|
||||
*** `QRecord destinationRecord` - as determined by the insert/update logic above, this will either be a
|
||||
new empty record (e.g., for inserting), or a fully populated record from the destination table (for updating).
|
||||
*** `QRecord sourceRecord` - this is the record being processed, from the source table.
|
||||
** This method is responsible for setting values in the `destinationRecord`, and returning that record
|
||||
(unless it has decided that for some reason the record should _not_ be stored, in which case it may return `null`).
|
||||
* Storing the records (done by QQQ):
|
||||
** This is typically done with the `LoadViaInsertOrUpdateStep`, though it is customizable if additional work is needed (e.g., via a
|
||||
subclass of `LoadViaInsertOrUpdateStep`, or a more custom subclass of `AbstractLoadStep`).
|
||||
|
||||
=== Bare-bones Example
|
||||
For this example, let's assume we're setting up a partner-order-feed as described above, with the following details:
|
||||
|
||||
* We have records from a partner in a table named `"partnerOrderImport"` (let's assume the records may have been created
|
||||
using the QQQ `FilesystemImporter` process).
|
||||
These records have the following fields:
|
||||
** `orderNo, date, city, state, postal, whseNo`
|
||||
* We need to synchronize those records with table in our database named `"order"`, with the following fields corresponding to those from the partner:
|
||||
** `partnerOrderNo, orderDate, shipToCity, shipToState, shipToZipCode, warehouseId`
|
||||
* The same conceptual order may appear in the `"partnerOrderImport"` multiple times, e.g., if they update some data on the order and re-transmit it to us.
|
||||
Meaning, we need to update our `"order"` records when we receive a new version an existing order.
|
||||
|
||||
To use `*TableSyncProcess*` for solving this use-case, we'll need to create 2 things:
|
||||
|
||||
1. A `QProcessMetaData` object, which we can create using the builder object provided by class `TableSyncProcess`.
|
||||
Note that this type of process is specialization of the standard QQQ `StreamedETLWithFrontendProcess`, as described elsewhere in this documentation.
|
||||
2. A subclass of `AbstractTableSyncTransformStep`, where we implement our mapping logic.
|
||||
Again, note that `AbstractTableSyncTransformStep` is a subclass of `AbstractTransformStep`, as used by `StreamedETLWithFrontendProcess`.
|
||||
|
||||
And to be good programmers, we'll actually create a 3rd thing:
|
||||
|
||||
[start=3]
|
||||
. A unit test for our Transform step.
|
||||
|
||||
Here are examples of these pieces of code:
|
||||
|
||||
[source,java]
|
||||
.Example of building process using the TableSyncProcess builder:
|
||||
----
|
||||
// the false argument below tells the build we are not a basepull-style process
|
||||
QProcessMetaData processMetaData = TableSyncProcess.processMetaDataBuilder(false)
|
||||
|
||||
// give our process a unique name within our QInstance
|
||||
.withName("partnerOrderToLocalOrderProcess")
|
||||
|
||||
// tell the process to what class to use for transforming records from source to destination
|
||||
.withSyncTransformStepClass(PartnerOrderToOrderTransformStep.class)
|
||||
|
||||
.getProcessMetaData();
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example implementation of an AbstractTableSyncTransformStep
|
||||
----
|
||||
public class PartnerOrderToOrderTransformStep extends AbstractTableSyncTransformStep
|
||||
{
|
||||
@Override
|
||||
protected SyncProcessConfig getSyncProcessConfig()
|
||||
{
|
||||
return (new SyncProcessConfig(
|
||||
"partnerOrderImport", // source tableName
|
||||
"orderNo", // source table key fieldName
|
||||
"order", // destination tableName
|
||||
"partnerOrderNo" // destination table foreign key fieldName
|
||||
));
|
||||
}
|
||||
|
||||
@Override
|
||||
public QRecord populateRecordToStore(RunBackendStepInput runBackendStepInput, QRecord destinationRecord, QRecord sourceRecord) throws QException
|
||||
{
|
||||
// map simple values from source table to destination table
|
||||
destinationRecord.setValue("orderDate", sourceRecord.get("date"));
|
||||
destinationRecord.setValue("shipToAddressCity", sourceRecord.get("city"));
|
||||
destinationRecord.setValue("shipToAddressState", sourceRecord.get("state"));
|
||||
destinationRecord.setValue("shipToAddressZipCode", sourceRecord.get("postal"));
|
||||
return (destinationRecord);
|
||||
}
|
||||
|
||||
}
|
||||
----
|
||||
|
||||
[source,java]
|
||||
.Example Unit Test for a transform step
|
||||
----
|
||||
@Test
|
||||
void testTransformStep()
|
||||
{
|
||||
// insert 1 test order, that will be updated by the transform step
|
||||
Integer existingId = new InsertAction().execute(new InsertInput("order").withRecords(List.of(
|
||||
new QRecord().withValue("partnerOrderNo", 101).withValue("shipToState", "IL")
|
||||
))).getRecords().get(0).getValueInteger("id");
|
||||
|
||||
// set up input for the step - a list of 2 of the partner's orders
|
||||
RunBackendStepInput input = new RunBackendStepInput();
|
||||
input.setRecords(List.of(
|
||||
new QRecord().withValue("orderNo", 101).withValue("state", "NY"), // will update the order above
|
||||
new QRecord().withValue("orderNo", 102).withValue("state", "CA") // will insert a new order
|
||||
));
|
||||
RunBackendStepOutput output = new RunBackendStepOutput();
|
||||
|
||||
// run the code under test - our transform step
|
||||
new PartnerOrderToOrderTransformStep().run(input, output);
|
||||
|
||||
// Note that by just running the transform step, no records have been stored.
|
||||
// We can assert against the output of this step.
|
||||
|
||||
assertEquals(existingId, output.getRecords().get(0).getValue("id"));
|
||||
assertEquals(101, output.getRecords().get(0).getValue("partnerOrderNo"));
|
||||
assertEquals("NY", output.getRecords().get(0).getValue("shipToState"));
|
||||
|
||||
assertNull(output.getRecords().get(1).getValue("id"));
|
||||
assertEquals(102, output.getRecords().get(1).getValue("partnerOrderNo"));
|
||||
assertEquals("CA", output.getRecords().get(1).getValue("shipToState"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testFullProcess()
|
||||
{
|
||||
// todo! :)
|
||||
}
|
||||
----
|
||||
|
||||
=== Pseudocode process flow
|
||||
Now that we've seen the bare-bones example, let's see an even more detailed breakdown of how a full `TableSyncProcess` works,
|
||||
by looking at its 3 "ETL" steps in pseudocode:
|
||||
|
||||
==== ExtractStep (Producer Thread)
|
||||
|
||||
* Queries source table for records
|
||||
** Often based on Table Automations (e.g., for all newly inserted records) or Basepull pattern (polling for new/updated records).
|
||||
|
||||
==== TransformStep (Consumer Thread)
|
||||
|
||||
* Receives pages of records from `ExtractStep` in the `run` method.
|
||||
* Makes `sourceKeyList` by getting `sourceTableKeyField` values from the records.
|
||||
* Calls `initializeRecordLookupHelper(runBackendStepInput, sourceRecordList)`
|
||||
** Calls `getLookupsToPreLoad` to control which lookups are performed.
|
||||
* Calls `getExistingRecordsByForeignKey(runBackendStepInput, destinationTableForeignKeyField, destinationTableName, sourceKeyList);`
|
||||
** Calls `getExistingRecordQueryFilter(runBackendStepInput, sourceKeyList)` as part of querying the `destinationTable`
|
||||
** Returns the output of `buildExistingRecordsMap(destinationTableForeignKeyField, queryOutput.getRecords())`
|
||||
* foreach input record (from `sourceTable`):
|
||||
** Calls `getExistingRecord(existingRecordsByForeignKey, destinationForeignKeyField, sourceKeyValue)`
|
||||
** if an existing record was returned (and if the syncConfig says `performUpdates`), this record is set as `recordToStore`
|
||||
** else if no existing record was returned (and if the syncConfig says `performInserts`), a new record is set as `recordToStore`
|
||||
** else continue the foreach.
|
||||
** call `populateRecordToStore(runBackendStepInput, recordToStore, sourceRecord)`
|
||||
** if a record is returned it is added to the process step output (to be stored in the LoadStep)
|
||||
|
||||
==== LoadStep (Consumer Thread)
|
||||
|
||||
* Receives records from the output of the `TransformStep`.
|
||||
* Inserts and/or Updates `destinationTable`, with records returned by `populateRecordToStore`
|
||||
|
||||
=== Additional Process Configuration Examples
|
||||
The following examples show how to use additional settings in the `TableSyncProcess` builder.
|
||||
|
||||
==== UI
|
||||
While a `TableSyncProcess` will often run via a schedule and/or automation, we may also want to allow users
|
||||
to manually run it in a UI.
|
||||
|
||||
[source,java]
|
||||
.Making our process available for a UI
|
||||
----
|
||||
QProcessMetaData processMetaData = TableSyncProcess.processMetaDataBuilder(true)
|
||||
.withName("partnerOrderToLocalOrderProcess")
|
||||
.withSyncTransformStepClass(PartnerOrderToOrderTransformStep.class)
|
||||
|
||||
// attach our process to its source table, to show up in UI
|
||||
.withTableName("partnerOrderImport")
|
||||
|
||||
// add some fields to display on the review screen, in UI
|
||||
.withReviewStepRecordFields(List.of(
|
||||
new QFieldMetaData("clientId", QFieldType.STRING).withLabel("Client"),
|
||||
new QFieldMetaData("warehouseId", QFieldType.STRING).withLabel("Warehouse"),
|
||||
new QFieldMetaData("partnerOrderNo", QFieldType.STRING)))
|
||||
.getProcessMetaData();
|
||||
----
|
||||
|
||||
==== Basepull
|
||||
The previous example would work as a Table Automation (e.g., where the list of records identified in the
|
||||
Extract step were determined by the Automation system).
|
||||
However, a second common pattern is to use `Basepull` (e.g., if polling for updated records from a partner API endpoint).
|
||||
|
||||
[source,java]
|
||||
.Configuring our process as a Basepull
|
||||
----
|
||||
// the true argument below tells the build we ARE a basepull-style process
|
||||
// this changes the default extract-step.
|
||||
QProcessMetaData processMetaData = TableSyncProcess.processMetaDataBuilder(false)
|
||||
.withName("partnerOrderToLocalOrderProcess")
|
||||
.withSyncTransformStepClass(PartnerOrderToOrderTransformStep.class)
|
||||
|
||||
// See Basepull documentation for details
|
||||
.withBasepullConfiguration(new BasepullConfiguration())
|
||||
|
||||
// schedule our process to run automatically every minute
|
||||
.withSchedule(new QScheduleMetaData().withRepeatSeconds(60))
|
||||
|
||||
.getProcessMetaData();
|
||||
----
|
||||
|
||||
=== Additional Options in the Transform Step
|
||||
|
||||
==== Specifying to not perform Inserts or not perform Updates
|
||||
We may have a scenario where we want our sync process to never update records if the key is already found in the destination table.
|
||||
We can configure this with an additional optional parameter to the `SyncProcessConfig` constructor:
|
||||
|
||||
[source,java]
|
||||
.Specifying to not do updates in a TableSyncProcess
|
||||
----
|
||||
@Override
|
||||
protected SyncProcessConfig getSyncProcessConfig()
|
||||
{
|
||||
return (new SyncProcessConfig("partnerOrderImport", "orderNo", "order", "partnerOrderNo",
|
||||
true, // performInserts
|
||||
false // performUpdates
|
||||
));
|
||||
}
|
||||
----
|
||||
|
||||
Similarly, we may want to disallow inserts from a particular sync process.
|
||||
The `performInserts` argument to the `SyncProcessConfig` constructor lets us do that:
|
||||
|
||||
[source,java]
|
||||
.Specifying to not do inserts in a TableSyncProcess
|
||||
----
|
||||
@Override
|
||||
protected SyncProcessConfig getSyncProcessConfig()
|
||||
{
|
||||
return (new SyncProcessConfig("partnerOrderImport", "orderNo", "order", "partnerOrderNo",
|
||||
false, // performInserts
|
||||
true // performUpdates
|
||||
));
|
||||
}
|
||||
----
|
||||
|
||||
==== Customizing the query for existing records
|
||||
|
||||
In some cases, a specific Table Sync process may need to refine the query filter that is used
|
||||
to lookup existing records in the destination table (e.g. for determining insert vs. update).
|
||||
|
||||
For example, in our orders-from-a-partner scenario, if we have more than 1 partner sending us orders,
|
||||
where there could be overlapping orderNo values among them - we may have an additional field in our
|
||||
orders table to identify which partner an order came from.
|
||||
So then when we're looking up orders by `partnerOrderNo`, we would need to also include the `partnerId` field
|
||||
in our query, so that we only update orders from the specific partner that we're dealing with.
|
||||
|
||||
To do this (to customize the existing record query filter), we need can just override the method `getExistingRecordQueryFilter`.
|
||||
Generally we would start by calling the `super` version of the method, and then add to it additional criteria.
|
||||
|
||||
[source,java]
|
||||
.Customizing the query filter used to look for existing records
|
||||
----
|
||||
/*******************************************************************************
|
||||
** Define the query filter to find existing records. e.g., for determining
|
||||
** insert vs. update. Subclasses may override this to customize the behavior,
|
||||
** e.g., in case an additional field is needed in the query.
|
||||
*******************************************************************************/
|
||||
protected QQueryFilter getExistingRecordQueryFilter(RunBackendStepInput runBackendStepInput, List<Serializable> sourceKeyList)
|
||||
{
|
||||
QQueryFilter filter = super.getExistingRecordQueryFilter(runBackendStepInput, sourceKeyList);
|
||||
filter.addCriteria(new QFilterCriteria("partnerId", EQUALS, PARTNER_ID));
|
||||
return (filter);
|
||||
}
|
||||
----
|
||||
|
||||
==== More efficient additional record lookups
|
||||
|
||||
It is a common use-case to need to map various ids from a partner's system to ids in your own system.
|
||||
For the orders example, we might need to know what warehouse the order is shipping from.
|
||||
The customer may send their identifier for the warehouse, and we may need to map those identifiers to our own warehouse ids.
|
||||
|
||||
The QQQ-provided class `RecordLookupHelper` exists to help with performing lookups like this,
|
||||
and in particular, it can be used to execute one query to fetch a full table, storing records
|
||||
by a key field, then returning those records without performing additional queries.
|
||||
|
||||
`AbstractTableSyncTransformStep` has a protected `recordLookupHelper` member.
|
||||
If we override the method `getLookupsToPreLoad()`, then this object is
|
||||
populated by calling its `preloadRecords` method with each specified pair of tableNames and fieldNames.
|
||||
|
||||
[source,java]
|
||||
.Specifying tables to pre-load using a RecordLookupHelper
|
||||
----
|
||||
/*******************************************************************************
|
||||
** Specify a list of tableName/keyColumnName pairs to run through
|
||||
** the preloadRecords method of the recordLookupHelper.
|
||||
*******************************************************************************/
|
||||
@Override
|
||||
protected List<Pair<String, String>> getLookupsToPreLoad()
|
||||
{
|
||||
return (List.of(
|
||||
Pair.of("warehouse", "partnerWarehouseNo")
|
||||
));
|
||||
}
|
||||
----
|
||||
|
||||
If we have preloaded some lookups, we can then use them in our `populateRecordToStore` method as follows:
|
||||
[source,java]
|
||||
.Using the recordLookupHelper in populateRecordToStore
|
||||
----
|
||||
// lookup warehouse with partnerWarehouseNo=whseNo from partner, and use our id in destination record
|
||||
String partnerWarehouseNo = sourceRecord.getValue("whseNo");
|
||||
Integer warehouseId = recordLookupHelper.getRecordId("warehouse", "partnerWarehouseNo", whseNo, Integer.class);
|
||||
destinationRecord.setValue("warehouseId", warehouseId);
|
||||
----
|
||||
|
||||
==== Additional override points
|
||||
|
||||
There are more methods which can be overridden in your `AbstractTableSyncTransformStep` subclass,
|
||||
to provide further customizations of behaviors, specifically in the area of dealing with existing
|
||||
records (e.g., the insert/update use-case).
|
||||
[source,java]
|
||||
.Additional AbstractTableSyncTransformStep overrides
|
||||
----
|
||||
|
||||
/*******************************************************************************
|
||||
** Run the existingRecordQueryFilter - to look in the destinationTable for
|
||||
** any records that may need an update (rather than an insert).
|
||||
**
|
||||
** Generally returns a Map, keyed by a Pair of the destinationTableForeignKeyField
|
||||
** and the value in that field. But, for more complex use-cases, one can override
|
||||
** the buildExistingRecordsMap method, to make different keys (e.g., if there are
|
||||
** two possible destinationTableForeignKeyFields).
|
||||
*******************************************************************************/
|
||||
protected Map<Pair<String, Serializable>, QRecord> getExistingRecordsByForeignKey
|
||||
(
|
||||
RunBackendStepInput runBackendStepInput,
|
||||
String destinationTableForeignKeyField,
|
||||
String destinationTableName,
|
||||
List<Serializable> sourceKeyList
|
||||
) throws QException;
|
||||
|
||||
|
||||
/*******************************************************************************
|
||||
** Overridable point where you can, for example, keys in the existingRecordsMap
|
||||
** with different fieldNames from the destinationTable.
|
||||
**
|
||||
** Note, if you're overriding this method, you'll likely also want & need to
|
||||
** override getExistingRecord.
|
||||
*******************************************************************************/
|
||||
protected Map<Pair<String, Serializable>, QRecord> buildExistingRecordsMap
|
||||
(
|
||||
String destinationTableForeignKeyField,
|
||||
List<QRecord> existingRecordList
|
||||
);
|
||||
|
||||
/*******************************************************************************
|
||||
** Given the map of existingRecordsByForeignKey (as built by
|
||||
** getExistingRecordsByForeignKey which calls buildExistingRecordsMap),
|
||||
** get one record from that map, for a given key-value from a source record.
|
||||
**
|
||||
** The destinationForeignKeyField is given as advice if needed (e.g., to see its type)
|
||||
*******************************************************************************/
|
||||
protected QRecord getExistingRecord
|
||||
(
|
||||
Map<Pair<String, Serializable>, QRecord> existingRecordsByForeignKey,
|
||||
QFieldMetaData destinationForeignKeyField,
|
||||
Serializable sourceKeyValue
|
||||
);
|
||||
|
||||
----
|
||||
|
@ -36,10 +36,9 @@ include::misc/ProcessBackendSteps.adoc[leveloffset=+1]
|
||||
=== Table Customizers
|
||||
#todo#
|
||||
|
||||
== QQQ Actions
|
||||
== QQQ Core Actions
|
||||
include::actions/QueryAction.adoc[leveloffset=+1]
|
||||
|
||||
=== GetAction
|
||||
include::actions/GetAction.adoc[leveloffset=+1]
|
||||
|
||||
=== CountAction
|
||||
@ -48,7 +47,6 @@ include::actions/GetAction.adoc[leveloffset=+1]
|
||||
=== AggregateAction
|
||||
#todo#
|
||||
|
||||
=== InsertAction
|
||||
include::actions/InsertAction.adoc[leveloffset=+1]
|
||||
|
||||
=== UpdateAction
|
||||
@ -60,4 +58,6 @@ include::actions/InsertAction.adoc[leveloffset=+1]
|
||||
=== AuditAction
|
||||
#todo#
|
||||
|
||||
== QQQ Default Implementations
|
||||
include::implementations/TableSync.adoc[leveloffset=+1]
|
||||
// later... include::actions/RenderTemplateAction.adoc[leveloffset=+1]
|
||||
|
1396
docs/index.html
1396
docs/index.html
File diff suppressed because it is too large
Load Diff
12863
docs/index.pdf
12863
docs/index.pdf
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user