Wednesday, February 4, 2015

Apache Pig UDF: Call a PDI transformation

For my latest fun side project, I looked at the integration of Pentaho Data Integration (PDI) and Apache Pig.  From the website: "Apache Pig is a platform for analyzing large data sets that consists of a high-level language for expressing data analysis programs, coupled with infrastructure for evaluating these programs." If you substitute "graphical" for "high-level" and "PDI" for "Apache Pig", you get a pretty accurate description of the Pentaho Data Integration product.  For this reason I thought it natural to look at the ways PDI and Pig could play together in the same pigpen, so to speak :)

Pentaho Data Integration has long offered an "Pig Script Executor" job entry, which allows the user to submit a Pig script to a Hadoop cluster (or a local Pig instance), which allows orchestration of data analysis programs written in Pig. However it doesn't integrate with other PDI capabilities (such as transformations) that are also data analysis programs.

My idea was to kind of turn the integration idea inside-out, so instead of PDI orchestrating Pig jobs, I wanted to leverage PDI transformations as data analysis programs inside of Pig.  I didn't want to have to include a PDI deployment inside a Pig deployment or vice versa; rather I envisioned a system where both Pig and PDI were installed, and the former could locate and use the latter.  This involved creating the following:
  1. A Pig UDF that can inject data into a PDI transformation and collect it on the other side, without needing PDI as a compile-time dependency
  2. A way to bridge the Pig UDF to a PDI deployment
  3. A way to transform Pig data types/values to/from PDI data types/values

For #2, I noticed that there are many places where this bridge could be leveraged (Hive, Spark, e.g.), so I created a project called pdi-bridge that could be used generally in other places. The project does two things:

First, it supplies classes that will run a transformation, inject rows, and collect result rows using an intermediate data model.

Second, there is a Java file (that is not compiled or included in the pdi-bridge JAR) called KettleBridge, this file needs to be copied into whatever integration project needs it, which in this case was my custom Pig UDF project.  The KettleBridge looks for a system property (then an environment variable) called KETTLE_HOME which needs to point at a valid PDI deployment. It then does some classloading and reflection stuff in order to wire up its static API methods to the PDI instance:

addRow - injects a row to the given transformation
finishTransformation - signals for the given transformation to stop running
getFieldHolder - returns the field holder (intermediate data model) for the given field name
getKettleClassloader - returns a classloader that includes the necessary PDI JARs
init - initializes the bridge
nextRow - retrieves a row from the PDI transformation "result set"
startTransformation - starts the given transformation

The project that uses the bridge is responsible for using getFieldHolder() and other methods to translate from the project's data types to PDI's data types. In my Pig UDF example I have methods like getFieldHolderList() and getKettleType(), which translate a Pig schema into a form which will ultimately be translated into PDI row metadata.  When calling addRow(), the objects passed in must be able to be used by PDI directly, so it is best to translate them to Java types such as String or Integer so the Injector step (see below) can convert the values.

You can see the code in my pdi-pig-udfs project, including the copy of KettleBridge and the RunKettleTrans class, the former providing for goal #2 above, and the latter providing for goals #1 and #3 above.

Next, I needed to get my JARs into a place where they could be used by Pig. Getting a Pig script to find and load the UDF jar was easy, using the REGISTER command (see Pig script below). In order to keep things simple, the KettleBridge class expects to find the pdi-bridge JAR in the same directory as the JAR that contains the KettleBridge class itself. So in this case the pdi-pig-udfs JAR and the pdi-bridge JAR need to be in the same folder.  You can get the pdi-bridge JAR by building from source (see my project link above) or by downloading it here. Same goes for the pdi-pig-udfs JAR, it can be downloaded here.  For my proof-of-concept, I built the pdi-pig-udfs JAR from source, then manually copied the pdi-bridge jar into the build/libs folder next to the other JAR.

Now that the plumbing was in place, I needed a transformation that would be executed as the UDF.  This transformation by convention needs the following things:
  • An Injector step called INPUT to be used to get rows into the transformation
  • A step called OUTPUT used to return rows to the UDF

For my example, I wanted to take firstname and lastname from a Pig script and inject them into a transformation that would uppercase the firstname, then concatenate the two and return a single field called fullname.  The transformation looks like this (don't mind the Text File Input step, that is for local testing outside Pig):

You can find the actual transformation on Gist.  Lastly, I needed a Pig script that registers the UDF JAR, loads in the data, then calls the UDF and dumps the output:

REGISTER '/Users/mburgess/git/pdi-pig-udfs/build/libs/pdi-pig-udfs-1.0.jar';

A = LOAD '../customers-100.txt' USING ';', 'NO_MULTILINE', 'NOCHANGE', 'SKIP_INPUT_HEADER') AS (id: int, lastname: chararray, firstname: chararray);

B = FOREACH A GENERATE pdi.pig.RunKettleTrans('/Users/mburgess/pdi-pig.ktr', firstname, lastname) AS fullname;


Notice I am using an absolute pathname to my UDF JAR, that is the same directory that contains the pdi-bridge JAR.  Then I'm loading customers-100.txt, getting the first three fields, and calling them "id", "lastname" and "firstname".  The FOREACH..GENERATE command will pass in the tuples to the UDF (called pdi.pig.RunKettleTrans). In this case it will pass in a tuple including the transformation filename, then firstname and lastname from the A dataset.  The transformation returns a single field called fullname, and the Pig output (from DUMP B) looks like this:

((FSJ-FIRSTNAME jwcdf-name))
((TUM-FIRSTNAME flhxu-name))
((GFE-FIRSTNAME xthfg-name))
((BNL-FIRSTNAME ulzrz-name))
((ONX-FIRSTNAME oxhyr-name))

Here is the command I ran, from the Pig 0.14 directory, to set KETTLE_HOME to PDI EE 5.2 and execute the above script with Pig in local mode:

KETTLE_HOME=~/pdi-ee- bin/pig -x local ~/pdi-udf.pig

This all might appear terribly complicated, but if you'd like to call PDI transformations from Pig, you should only need to do the following:

1) Download the pdi-pig-udfs and pdi-bridge JARs into a single location. If you're running on Hadoop you might need to put them in HDFS or in some common location on the cluster where Pig can find them while running MapReduce (I only tested in local mode)

2) Create a transformation according to the rules above (Injector step called INPUT, e.g.)

3) Create a Pig script that calls pdi.pig.RunKettleTrans and passes in the location of the transformation and whatever other fields you have identified in your Injector step. You should also be able to use the results within the Pig script as well.

As this was just a proof-of-concept, there are probably a few bugs in there, and I wouldn't be surprised if more work is needed to get it going on a Hadoop cluster running Pig, but I wanted to show that Pig and other technologies are very approachable and amenable to PDI integration.