Monday, November 9, 2015

Data Lineage internals in PDI 6.0

In Pentaho Data Integration 6.0, we released a great new capability to collect data lineage for PDI transformations and jobs.  Data lineage is an oft-overloaded term, but for the purposes of this blog I will be talking about the flow of data from external sources into steps/entries and possibly out to other external targets. Basically we keep track of all fields as they are created, split, aggregated, transformed, etc. over the course of a transformation or job.  When jobs or transformations call other jobs or transformations, that relationship is also captured. So in that sense you can follow your data all the way through your PDI process.

The code for the current data lineage capability is entirely open source and is available on GitHub here: https://github.com/pentaho/pentaho-metaverse. You may see the term "metaverse" listed throughout the code and documentation (including the project name itself). The term was a pet name for what we envisioned the end product to be, a universe of metadata and relationships between all the artifacts and concepts in the Pentaho stack.  Whether that vision is realized the same way depends on the roadmap, it is very possible the needs of Pentaho's customers will drive the data lineage capabilities in a different direction.

Approach

Collecting lineage information for PDI is non-trivial. It may seem like the fields, steps, and operations are readily available such that the relationships could easily be discovered. However PDI is a very flexible and powerful tool. This includes APIs that are more general than uniform, as flexibility has seemed a more important goal than introspection.  For example, getting the list of fields that are output from a transformation step involves calling the getStepFields() API method. This lets the step report its own outgoing fields, and many times the step needs to know the incoming fields before it can properly report the output fields. So the step in turn calls the previous steps' getStepFields() methods.  In the case of a Table Input step, the query is actually executed so the metadata of the ResultSet is available to determine the output fields. This requires a valid database connection and introduces extra latency into the lineage collection.

Other considerations include variables and parameters. It is possible to parameterize things like table names, field names, etc. This makes it impossible to collect accurate data lineage based on a transformation "at-rest", i.e. during design time. Even using parameters' default values doesn't work as the default value may be meant to fail the transformation (to ensure valid parameter values are passed in).  For this reason, data lineage collection is performed at run-time, such that the fields and values are all resolved and known.  There is one caveat to this that I'll talk about at the end of the blog, but it is unsupported, undocumented, and more geared for the community for the time being.

At present, the architecture and flow for collecting lineage is as follows:

-  Just before a transformation is executed, a graph model is created and associated with the running Trans object. We use Apache Tinkerpop 2.6 for this (more on that in a moment)

-  If a step/entry is data-driven, we collect lineage information as each row is processed. Data-driven means the step behaves differently based on values coming in via fields in each row. For example, a Table Input step that gets parameters from a previous step, or a REST step that gets its URLs from an incoming field. This is contrast to a variable or parameter, which is defined and resolved once at the beginning of execution, and does not change over the execution.

- Once a transformation/job has finished, we iterate over each step/entry to collect the rest of the lineage information. This is done using a StepAnalyzer or JobEntryAnalyzer. These are individual objects responsible for collecting lineage information for a particular step (Table Output, e.g.), and there are generic versions of each in the event that no specific one exists.  There are also top-level analyzers for transformations and jobs, which actually perform the iteration and add top-level lineage information to the graph.

- The GenericStepMetaAnalyzer and GenericJobEntryMetaAnalyzer make a best-guess effort to collect accurate lineage. There is no encompassing API to collect all the various operations and business logic performed in each step. So all fields are assumed to be pass-through (meaning the structure of a named field -- type, e.g. -- hasn't changed as a result of the logic) and any new fields have no relationships to the incoming fields (as we don't know what those relationships are). To report full lineage, a step/entry needs a specific analyzer that understands the business logic of that step or entry.

- There are "decorator" analyzer interfaces (and base implementations) that can be associated with step/entry analyzers, most notably ExternalResourceStepAnalyzer and StepDatabaseConnectionAnalyzer (and the JobEntry versions thereof). These (when present for a step/entry analyzer) are called by the lineage collector to get relationships between steps (and/or their fields) to resources outside the transformation, such as text files, databases, etc.

The workhorse in this situation (the "lineage collector") is implemented in TransformationRuntimeExtensionPoint (there's a Job version too of course). This is the entry point to be called before a transformation starts, namely at the TransformationStartThreads extension point (see full list here). Instead of implementing multiple extension point plugins to be activated at the various points during execution, the TransListener interface provided the level of interaction we wanted, so the extension point plugin simply adds a TransListener interface (also implemented by the TransformationRuntimeExtensionPoint object) which will be called by the PDI internals.

The transStarted() method of TransformationRuntimeExtensionPoint creates a new "root node" associated with the client (Spoon, e.g.). This provides a well-known entry point for querying the graph if no other information is known.  When a graph model is created, nodes for all the concepts (transformation, job, database connection, step, field, etc.) are added to the graph as well. The method also creates a future runner for the lineage analysis, which will be called when the transformation is complete.

The transFinished() method spins off a new thread to perform the full lineage analysis. The Runnable calls into the parent entity's analyzer, so a TransformationAnalyzer or JobAnalyzer. The top-level analyzer adds its own lineage/metadata to the graph, then iterates over the steps/entries' so their analyzers can add their lineage information (see Graph Model and Usage below)

NOTE: In the code you will see lots of references to ExecutionProfile. This may be tied to the lineage graph someday (and indeed there is some data common to both) but for now it is there to collect something like the PDI Operations Mart and logging do, but in a uniform fashion with a standard format (JSON).

Graph Model and Usage

PDI's data lineage model is based on a property graph model. A graph is composed of nodes (concepts, entities, etc.) and edges connecting nodes (representing relationships between nodes). Nodes and edges can have properties (such as name = "my transformation" or relationship = "knows"). For our model, the nodes are things like the executed jobs/transformations, steps, stream fields, database connections, etc. Also the model includes "concept nodes" that allow for more targeted graph queries. For example, the graph includes a concept node labelled "Transformation", and all executed transformations in that graph have basically an "is-a" relationship with that concept node. In practice, it is a "parentconcept" edge from the concept node to the instance(s) of that concept. In our example, we could use it to start a query from the Transformation concept node and find all nodes connected to it via an outgoing "parentconcept" edge. This query returns nodes corresponding to all transformations executed for this lineage artifact.

For our property graph model implementation, we chose the open-source project Tinkerpop. The 3.x line of Tinkerpop has been accepted to the Apache Incubator, and I certainly congratulate them on that achievement!  Tinkerpop 3.x has absorbed all the 2.x products into a single product, and represents an impressive leap forward in terms of graph planning/processing engines. Having said that, Tinkerpop 3 requires Java 8, and since PDI 6.0 supports Java 7, we had to use the deprecated 2.6 version.  However 2.x had more than enough functionality for us, we just had to bring in the pieces we needed.  Those include the following:

Blueprints: This is the generic graph API, upon which all other Tinkerpop products are built, and useful in its own right to work at a low level with the graph model.

Pipes: This is the dataflow framework to allow for graph traversal and processing using a pipeline architecture (hence the name). The process pipeline(s) are themselves modelled as a graph (called a process graph)

Gremlin: This is the actual data traversal language, available as a Java API as well as a fluent Groovy DSL. Graph queries in Pentaho data lineage are materialized as Gremlin statements, which are executed as a process graph using the Pipes framework

Frames: This is basically an ORM from the graph/process models to Java objects. It allows the lineage code to offer a Java method whose body is essentially a Gremlin query that returns the specified object. There is some overhead involved with this ORM (due to the amount of reflection and such that is needed), so we only use Frames at present for integration testing. However it did increase our productivity and made our tests much less verbose :)

Viewing PDI Lineage Information

There's already a quite excellent blog on this subject by Pedro Alves, I highly recommend it as it explains where PDI stores lineage, as well as how to retrieve and display it using 3rd party tools such as yEd.


Design-time Lineage Information

As I mentioned, the lineage capability for PDI 6.0 is first-and-foremost a runtime lineage collection engine. However there are some APIs and such for accessing the lineage of an active transformation in Spoon. For example, the Data Services optimizations dialog uses something called the LineageClient to determine the "origin fields" for those fields exposed by a data service, in order to find possible push-down optimizations.

LineageClient contains methods offering a domain-level API for querying the lineage graphs. Inside each of these methods you'll see the Gremlin-Java API at work. Note: we decided not to include Groovy as an additional compile/runtime dependency to keep things simple and smaller in the build. This makes the usage more verbose (see the code) but there was no loss of functionality for us, there's a Tinkerpop wiki page on how to do Gremlin in Java.

To actually build the lineage graph for the active transformation, PDI 6.0 has TransOpenedExtensionPoint and TransChangedExtensionPoint plugins, each of which will create and populate the lineage graph for the active TransMeta object. It uses TransExtensionPointUtil.addLineageGraph() to achieve this.  This didn't need to be in its own thread as we can't collect data-driven lineage and we don't dive all the way down into executed transformations. The latter is because some transformations are dynamically created (using metadata injection for example).

So the extension points create and maintain the lineage graph for the active TransMeta, and the LineageClient can query (at the domain level) said graph.  However the graph(s) are stored in the LineageGraphMap and are thus accessible by anybody (using the active TransMeta as the key).  Similarly, the runtime graphs are available in the TransLineageHolderMap during their execution lifetime.

Get Involved

If you're looking to use the lineage capability from an external application, check out the HTTP API for lineage. If you'd like to get involved in the codebase, one great way is to add a StepAnalyzer or JobEntryAnalyzer for a step/entry that isn't covered yet.  The documentation for how to contribute these is on help.pentaho.com.  If you want to know which steps/entries have analyzers, start up PDI and (using the same HTTP API base as in the above link) point at cxf/lineage/info/steps (or .../entries for Job Entry analyzers)

Summary

Hopefully this sheds some light on the innards of the Pentaho Data Integration 6.0 data lineage capability.  This opens a world of opportunities both inside and outside Pentaho for maintaining provenance, determining change impact, auditing, and metadata management.  Cheers!