Skip navigation
1 2 3 Previous Next

Pentaho

38 posts

This post originally published by Chris Deptula on Tuesday, October 27, 2015

 

I recently attended the Strata-HadoopWorld conference in NYC.  I have been attending this conference for the past few years and each year a theme emerges.  A few years ago it was SQL on Hadoop, last year was all Spark.  This year there was a lot of buzz about streaming and continuous data ingestion. Although often presented as different topics, the line between these concepts is blurring. 

 

To simplify, streaming means working with queues of events that are constantly flowing via a messaging system. Continuous ingestion is used more broadly and is the process of continuously ingesting from any data source, whether that be processing a file as soon as it is received, monitoring the database for new inserts, or streaming from a message queue.

 

One of the challenges with continuous ingestion is making data immediately available for analytics in a performant manner. For smaller amounts of data this can often be done using RDBMS’; however, for Big Data more complexity is required. Cloudera is trying to solve some of these challenges with Kudu; however, Kudu is still in beta and should not be used in production. Shortly before Strata-HadoopWorld, I started working on a demo for PentahoWorld using an architecture that solves this problem.

 

Before we go any further, let’s take a step back and explain the use case for our Pentaho World demo of real time analytics. I wanted to build a system that ingested a stream of tweets about the conference in real time while also making them available for analytics. The analytics are presented using a Pentaho CDE dashboard and Mondrian cube for slicing and dicing in Pentaho Analyzer. For the dashboard I performed a bit of Natural Language Processing on the tweets to determine their sentiment, and then made both aggregate metrics and individual tweets available as soon as the tweet was received. For the Pentaho Analyzer slice and dice capabilities, I allowed the data to be slightly older (not real time), but did update the underlying cube frequently.

 

How did I do this? At a high level I built a Pentaho Data Integration (PDI) transformation that acted as a daemon. This transformation used the Twitter Streaming API to get a continuous stream of tweets with the #PWorld15 hashtag, and does not stop processing new data until the transformation is manually stopped. I then used the Stanford Sentiment Analysis plugin for PDI to calculate the Wilson Sentiment of the tweet, before writing this data into MongoDB.

 

Once the data was in MongoDB, I built my Pentaho CDE dashboard. Pentaho CDE has the ability to source data from PDI transformations.  I built a few PDI transformations that used the MongoDB input step with Aggregate Pipelines to calculate the metrics I needed. Using Pentaho transformations as the input saved me from having to write a lot of JavaScript. Then it was simply a matter of telling the CDE components to refresh every 10 seconds and I had built my dashboard!

 

For the cube, Pentaho Analyzer requires a SQL interface to the data. It is true, that this could be solved by using PDI’s data service capability, which allows you to use SQL to query the results of a PDI transformation. However, the “PDI as a data service” option has a limitation in that the output data from the transformation must be able to fit in memory -- not practical for Big Data. Instead, I extended the transformation that I used to stream the data from Twitter into MongoDB to also write the data to Hadoop/HDFS. This enabled me to expose the data via Impala into the SQL interface Pentaho Analyzer requires.

 

Brilliant!  But, there was a catch. In Hadoop, a file is not readable until the file handle has been closed. If I never close the file because my transformation is constantly running, then the data would never be available to query. I could have written a file for every record, but this would have resulted in way too many small files and would have been detrimental to performance. Instead, I used the Transformation Executor step in PDI to group the tweets into 10 minute chunks, and then wrote one file every 10 minutes.

 

Once I had all of this figured out and working, I tested and encountered yet another problem. Tweets have new lines and lots of special characters. Storing the data in a plain text file was going to be a challenge. The good news is there are better file formats for Hadoop than plain text files, such as Avro and Parquet. So, instead of using text files, I used the Pentaho Consulting Labs Parquet Output step. This had the dual benefit of easily handling new lines and special characters while also improving the underlying performance of Cloudera Impala queries.

 

Finally, I created a few Impala tables and a Mondrian cube, and solved my problem. I was able to stream tweets in real time through Pentaho to make them immediately available in a dashboard, and, with a modest delay, also have them available for slice and dice analytics.

 

 

 

This post originally published by Kevin Haas on Tuesday, July 14, 2015

 

When working with our clients, we find a growing number who regard their customer or transaction data as not just an internally leveraged asset, but one that can enrich their relationships with customers and supporting partners. They need to systematically share data and analytics outside their firewall.

 

One of the ways we help them achieve this goal is with Custom Data API's. In this article, we'll show you how to build Data API's using visual programming within the Pentaho Data Integration Interface. Along the way, we'll expose capabilities of the platform that are not always seen by the average user.

 

Inspecting The Field: Inside Pentaho Data Integration

For those who use Pentaho's Data Integration (PDI aka Kettle) platform regularly, you're probably aware of the amazing things you can do. For the uninitiated, PDI is a very powerful Extract-Transform-Load (ETL) technology that lets you access data from virtually any source, perform complex transformations, and push/load that data to virtually any destination. It's commonly used for integrating and shaping data into new forms optimized for analytical purposes (data warehouses, data marts, etc.).

 

However, Pentaho experts know that Kettle is more than an ETL platform. It is powered by an amazing, open, and flexible data management platform that can not only read and write data from relational databases, Hadoop, NoSQL databases, and Web API's, but it can also serve data to other applications.

 

Enter one of the most powerful (but maybe underutilized) components of the Pentaho Data Integration Platform: the Data Integration Server. It can power your API's.

 

Sowing The Seeds: The Data Integration Server

If you're an Enterprise Edition Pentaho customer, you're probably aware of the capabilities of the Data Integration Server. It's one of the foundational reasons why customers choose the Enterprise Edition of PDI. The Data Integration Server is a server application that allows you to schedule and run jobs and transformations created with PDIs "Spoon" visual development tool.

 

Community Edition users have a similar (but not as sophisticated) version of the Data Integration Server with the included Carte server. If you're a community edition user, it's easy enough to start up your own local Carte server on a port of your choosing. You could start Carte on port 8081 like this:

 

carte.sh 127.0.0.1 8081

or

carte.bat 127.0.0.1 8081

 

There are many configuration options for the Data Integration Server and Carte server that allow it to operate in all kinds of forms. See here for the current list of options.

 

Bringing It To Life: Calling The Transformation on the Server with ExecuteTrans

The first step to create a Data API is to develop a Pentaho Data Integration transformation that gathers data from virtually any source: RDBMS, Hadoop, NoSQL Databases, Web APIs, etc. Since transformations can be parameterized, you can apply filtering, sorting or any other customizable logic to your code. Again, this can all be developed using PDIs graphical development tool: Spoon.

 

Once you have installed and launched your Data Integration or Carte server, you can execute your transformations with a simple HTTP call using the ExecuteTrans method. For example, if you have Carte running on port 8081 and want to execute a transformation that is stored in /srv/pentaho/sample.ktr and accepts a parameter named "parm", you can simply call:

 

http://127.0.0.1:8081/kettle/executeTrans/?trans=/srv/pentaho/sample.ktr&parm=parmvalue

 

When you launch this URL within a browser, you wont see a lot of information in return. In fact, you'll get nothing back! But, if you look at the PDI logs on the server, you'll see that the sample.ktr transformation did run. This is all well and good -- you were able to launch a transformation on a server via an HTTP request. But, now you want it to return some data!

 

Growing a Transformation into an API: The Output To Servlet Option

To provide an output, the transformation must produce one or more rows of data through a configured output step. To send rows of data out as a response from the ExecuteTrans method, your transformation must pass this data to a Text File Output (or JSON Output) Step, configuring the step to "Pass output to servlet."

 

 

By checking the "Pass output to servlet" box, the transform will take the stream of data entering this step and push it out as your response. Note that the exact data contents of the stream are output as specified by the steps formatting options (e.g. fields, headers, separators). You'll want to configure depending on your needs.

 

Bringing it to Market: A Taste of Farmers Market Data In A Custom Data API

Lets look at an example. With harvest season upon us, more and more farmers markets are filling up with their bounty. But where can you find local farmers markets? Thankfully, the USDA provides an API that lets you search for nearby markets.

 

http://catalog.data.gov/dataset/farmers-markets-search 

 

This is a powerful set of data, but its not always easy to use the API (or any API, as provided). In this case, to get a market, you first provide a zip code. The API returns a list of markets with their associated IDs, which you then must use to query the API again in order to get detailed information on each market. Also, we'd like to handle the result data using JSON format and some elements of the API return comma separated values. Finally, we need to format the returned address a bit better, attaching latitude and longitude for mapping.

 

To begin, we'll create a transform that accepts a parameter called "zip", and does the following:

 

1. Generate Rows: Initialize with a row for Pentaho to process input parameters.

2. HTTP Client: Call the USDA API to get the list of markets with the input parameters.

3. JSON Output: Convert the JSON that comes back into a list of markets.

4. JSON Output: Extract the Market IDs from that list of markets.

5. Calculator: Format a URL string to re-query the API to get detailed market info for each market.

6. HTTP Client: Call the USDA API again for each of the markets to get the details.

7. JSON Output: Extract the detailed address info on the market.

8. HTTP Client: Call the Google Geocoding API to get the latitude and longitude.

9. JSON Output: Extract the latitude and longitude from Google's response.

10. Javascript: Clean up and format the Address and Market info from the source APIs into a nice JSON format.

11. Group By: Group all of the detailed Markets from the API into a single row.

12. Javascript: Format the final JSON.

13. Text File Output: Output the single data element as a response to the servlet to show in the server.

 

 

A lot of steps, for sure, but their configuration is relatively simple. This is what the transformation looks like in the Spoon IDE:

 

Once we have developed and tested our transformation using Spoon, we are ready to use our Carte server and the ExecuteTrans method. We open a browser and enter the following URL:

 

http://127.0.0.1:8081/kettle/executeTrans/?trans=/mnt/pentaho/pdi-ee/repository/api/farmers-market.ktr&zip=60634 

 

Note, we can even front end the Carte or DI server with Apache and do a little URL rewriting to make it a friendlier URL to call. We did this. So instead of the above executeTrans URL, we can call this:

 

http://127.0.0.1:8081/api/farmers-market?zip=60634

 

When you execute the API, it does all the work for you. It calls all the various child APIs, does all the cleanup and formatting, and returns delicious looking JSON, with all of the added items from the Transformation as a result. For example:

 

{ 
    market:[ 
       { 
          ami_id:"1010508",
          marketname:"Eli's Cheesecake and Wright College Farmer's Market",
          location:"6701 W. Forest Preserve Drive, Chicago, Illinois, 60634",
          address1:"6701 West Forest Preserve Drive",
          address2:"",
          city:"Chicago",
          state:"IL",
          postal_code:"60634",
          latitude:"41.9588993",
          longitude:"-87.7944428",
          schedule:[ 
             "06/12/2015 to 09/25/2015 Thu: 7:00 AM-1:00 PM"
          ],
          products:[ 
             "Baked goods",
             "Crafts and/or woodworking items",
             "Cut flowers",
             "Fresh fruit and vegetables",
             "Honey",
             "Canned or preserved fruits, vegetables, jams, jellies, preserves, dried fruit, etc.",
             "Maple syrup and/or maple products",
             "Poultry",
             "Soap and/or body care products"
          ]
       }
    ]
 }

 

Taking It Home: Building Your Own Data APIs

This is a fun illustration of how to pull data from various sources and craft it together into your very own API. All of this was done with with out-of-the-box Pentaho Data Integration capabilities, no extra parts required.

 

Clearly, you're not limited on what you can do with this capability. You can easily get data from virtually any source and wrap a custom data API around it, giving you new capabilities to leverage your existing infrastructure, and share data in new ways. Pentaho has a lot of capabilities inside. We know that once you take PDI home from the market and look inside the box, you'll be inspired.

 

If you're interested in seeing how the transformation works, including a look at the transformation that powers the example, just let us know.

This post originally published by Bryan Senseman on Wednesday, October 15, 2014

 

I'm a huge user of Mondrian, the high speed open source OLAP engine behind Pentaho Analyzer, Saiku, and more. While the core Mondrian engine is wonderful, there are times when it doesn't do what I need it to, or exactly what I expect it to. Take this special case that I've encountered at some of my OEM customers, a case I call "Abnormal Genealogy."

 

Mondrian Needs Certain Data To Show Hierarchies

Mondrian has multiple ways to define hierarchies; traditional, ragged and parent-child. This is the feature that allows you to roll up and roll down metrics for complex parent and child record combinations. Think organization hierarchies, product hierarchies and employee hierarchies. In many cases, you have the IDs and data to support this cleanly, the way Mondrian expects, where parents have IDs that are less than (e.g. 3 < 14) than their associated children IDs.

 

But having data the way Mondrian expects it isn't always the case, particularly with OEM customers. There can be cases where the IDs of the child are less than than IDs of the parent. For example, my latest client preloads their system with standard/common "items" and then allows the end customer to create new parents (groupings) of these items. As a result, the preloaded items end up with IDs that are less than the newly added, customer-specific parent items. This reasonable load pattern breaks the Mondrian assumption that parent IDs are always less than child IDs.

 

Parent-Child ID's Need to Follow Certain Rules

When I first saw this, I thought "This is an edge case." But then I saw it again. And again. Clearly, I had to come up with a repeatable way to solve this problem. Fortunately, the ubiquitous FoodMart database can be used to replicate this issue. It contains an employee hierarchy with parent-child relationships that are pre-populated to fit the Mondrian assumption: all parent IDs are less than child IDs.  

 

Out of the box, FoodMart works correctly, but what happens if we simulate the replacement of a supervisor? We need to insert a new supervisor record in the employee table. This results in the new supervisor record receiving a higher employee_id than the employees which will report to it. The simple assignment of IDs for new records causes this peculiarly common case: abnormal genealogy!

 

To demonstrate this FoodMart scenario, let's look at a Pentaho Analyzer report showing Senior Management and some reporting data before the supervisor replacement:

 

 

Sheri Nowmer, inclusive of herself, has 10 Senior Managers assigned to her,  which includes Roberta Damstra.   All is well and good since the IDs of the parent records (supervisors) are less than the IDs of the child records (managed employees). Now, let's say Roberta retires to sunny Orlando and a new VP is hired, Mr. New Guy, on 1/1/1998. A few simple SQL edits and a Mondrian cache clearing and we should be good to go. We'll assign Mr. New Guy's employee_id with the value 1206. 1206 is much larger than any other employee_id in our database so it will easily stick out when we are reviewing the data. It also follows the usual pattern of a new employee record being assigned an ID which is greater than the maximum ID used.

 

Running the same report again after clearing the Analyzer and Mondrian caches, provides us with this view:

 

 

As expected, Roberta is no longer listed. However, Mr. New Guy also does not appear in this view. We confirm that we inserted his record so that he reports to Sheri Nowmer and that all of Roberta's employees now report to him. So, what happened to Mr. New Guy? His "abnormal genealogy" is causing internal issues for Mondrian. That is, the fact that his ID is greater than the IDs of his reports is causing Mondrian to incorrectly traverse the reporting hierarchy.

 

Solving the Abnormal Geneaology Problem

So, how can we solve this problem? We could dig into the Mondrian source code. It's open source afterall. But, I'd prefer to leave revising the core of the Mondrian engine to the java wizards at Pentaho and in the community. Perhaps, we can pull off a bit of Pentaho Data Integration (PDI) magic with some minor data structure changes to overcome this gap faster and easier.

 

The solution here is to use PDI to generate an extra set of ordered parent-child IDs. Using PDI, we need to identify the correct hierarchical order and make this order available to Mondrian. The Transform shown below demonstrates the steps I used to re-sequence the IDs to the correct genealogical order. Using this approach, parents consistently have smaller IDs than all of their children “making Mondrian happy without the need to modify its core code." Since the solution is data-centric we're also better protected for future versions of Mondrian.

 

 

So where exactly is the "magic" needed to resequence the IDs? It's in the use of the Closure Generator step, utilizing the existing parent and child IDs (which works regardless of genealogy). In this case, we take the base data and then reorder the ultimate parent rows by distance, as provided from the Closure Generator. Notice (below) that the Mr. New Guy, employee_id number 1206, is now the ninth "oldest" employee, perfect for a parent!

 

 

The next part of the solution is to update the dimension table with both ordered IDs as they will be needed in the Mondrian hierarchy configuration. If you're maintaining this with a PDI Dimension Lookup/Update step, I suggest using the Punch Thru option for these two fields. 

 

The final bit of data engineering requires you to create a tweaked version of the closure table that uses the ordered parent and original child IDs. This allows the correct groupings for parent aggregation while linking back into the original fact table with the actual IDs.

 

 

Now that our data engineering tasks are complete, all we need is a small adjustment of  the hierarchy definition in the Mondrian schema. We simply modify the level definition to utilize the new ordered fields. These changes result in the following dimension definition. (Note we create new tables called employee2 and employee_closure_cube in order to maintain the original foodmart structures in case we want to revert to the out-of-the-box configuration later.)

 

 

Back to Analyzer we go, refreshing both caches and...sure enough; Mr. New Guy is official!

 

 

When I first encountered this problem, even Google wasn't able to find an effective solution. I only found a few discussions of the problem in semi-related jiras: http://jira.pentaho.com/browse/MONDRIAN-1328 and http://jira.pentaho.com/browse/MONDRIAN-1511. Given my experience with hierarchical data, I expect that this is a fairly common case. I hope you find this approach helpful.

Kevin Haas

Lessons Learned With Sqoop

Posted by Kevin Haas Dec 15, 2017

This post originally published by Chris Deptula on Wednesday, November 19, 2014.

 

Many of you requested more information on the inner workings of the Sqoop component. Perhaps the best way to explain is via "lessons learned". Here goes...

 

Use the split-by Option

Sqoop is primarily used to extract and import data from databases into HDFS and vice versa. This was exactly what HDI needed to do. So, I pulled up the Sqoop user manual for a quick refresher. I quickly found the import-all-tables Sqoop option. It seemed like the perfect solution. One command and I can extract every table from the datamart to HDFS. Great! So I tried it...

 

Almost immediately, Sqoop failed. It turns out that Sqoop, by default, extracts data from tables in chunks, and the chunking is done using each table's primary key. If a table has a single column primary key the extract works. If no primary key exists, it fails. In our experience, most dimension tables follow this design pattern -- using a single-column surrogate key. However, many, if not most, fact tables do not have a primary key, much less one stored in a single column. This was the case in our test database. When Sqoop tried to extract a fact table, the job failed. Time to start over.

 

It turns out Sqoop has a split-by option specifically designed to override the primary key default. Unfortunately, the split-by option cannot be used with import-all-tables since the split-by column will vary by table. This meant that HDI would have to generate table-specific import statements, each with their own split-by column. Simple enough. I could use Pentaho Data Integration to interrogate the database metadata and generate each table's specific Sqoop import command. Alas, once I began designing, I soon discovered complexity.

 

When using the split-by option, you should choose a column which contains values that are uniformly distributed. If there is a skew in the column data, then the chunks being processed will also be skewed causing overweighted import job tasks to potentially fail due to lack of compute resource (usually memory). Thus, picking the spit-by column required knowledge of table content that an automated process may not have. We'd have to make educated guesses.

 

Like the import-all-tables option, the easiest rule to apply is to use a single column primary key if it exists. We'd have to assume that the column is a surrogate key that was populated with a sequence of values. If no primary key column exists, we look for a date column. Our educated guess is that dates represent events and events tend to be more uniformly distributed than not. If no date column exists, we look for an integer column. If no integer, then we simply grab the first column we can find. Clearly this system is "version 1", with much potential for optimization. We have some ideas and plan on iterating the design in the coming months.

 

With the acknowledged fuzziness of our split-by algorithm, we knew that we'd have to enable HDI configuration so that it only generated code. You can use HDI to interrogate your source database metadata, generate the Sqoop import commands and, then, optionally execute those commands. This gave us more control to manually customize and execute the generated import commands.

 

Next, with my generated Sqoop import commands, I tried to run them.

 

Manage Database Fetch Size

The imports were working. I had loaded several dimensions and the first fact table. However, while importing my second, larger fact table, the MapReduce job failed with a GC Overhead limit exceeded error message. After a lot of Google searching and head scratching I determined that the source MySQL database was reading all the rows in one fetch and trying to store everything into memory. I had to tell MySQL to return the data in batches. The following parameters on my jdbc connection string did the trick.

 

 

?dontTrackOpenResources=true&defaultFetchSize=1000&useCursorFetch=true

 

My issue and solution were specific to MySQL, but I suspect that the same problem could occur for other RDBMS and jdbc drivers. You've been warned.

 

Convert Dates to Strings

With the fetch problem solved, my Sqoop imports finally completed. My next task was to define Hive tables on the newly loaded data and attempt some queries. All was fine until I selected some date fields. They were returned integers instead of dates...ugh! Being an experienced programmer, I quickly realized that the integers were "seconds since epoch." -- which most will agree is not a user friendly way to present dates! I dug through the Sqoop code and recognized that my date values were being treated as timestamps. To prevent Sqoop from converting dates to seconds since epoch, I had to manually map the datatype for each date field. I used Sqoop's map-column-java option to map date columns to the java String class. The generated Sqoop command had to include an option like this:

 

--map-column-java my_date=String

 

Once I made the change, the dates were output and queryable in yyyy-MM-dd HH:mm:ss.S format instead of seconds since epoch.

 

Success!

It took quite a bit of trial and error, but I was finally able to develop a Hadoop Datamart Importer. The core Sqoop import command generation module works like a champ! If you would like to learn more about how Pentaho Consulting's Data Engineering services can help you with your Hadoop data ingestion needs, give us a call.

This post originally published by Chris Deptula on Tuesday, February 24, 2015.

 

This is the third in a three part blog on working with small files in Hadoop.

 

In my previous blogs, we defined what constitutes a small file and why Hadoop prefers fewer, larger files. We then elaborated on the specific issues that small files cause, specifically inordinate NameNode memory usage and MapReduce performance degradation. We then began exploring common solutions to these problems. In this blog, I’ll close this series by examining some less commonly used alternatives for solving the MapReduce performance problem and the factors to consider when choosing a solution.

 

Solving the MapReduce Performance Problem

In my previous blog, I listed the following solutions for mitigating the MapReduce performance problem:

• Change the ingestion process/interval

• Batch file consolidation

• Sequence files

• HBase

• S3DistCp (If using Amazon EMR)

• Using a CombineFileInputFormat

• Hive configuration settings

• Using Hadoop’s append capabilities

 

My second blog already discussed changing the ingestion process, batch file consolidation, and sequence files. I will cover the remaining options here.

 

HBase

If you are producing numerous small files, storing your data as files in HDFS may not be the best solution. Instead you might consider using an HBase column store. Using HBase changes your ingestion process from producing many small HDFS files to writing individual records into HBase tables. If your data access pattern is based on well-defined, random-access lookups, HBase may be your best alternative. It is architecturally tuned for high-velocity data record inserts, high-volume, individual record lookups and streaming based analytics. However, if your data access pattern tends toward full file/table scans, then HBase may not be optimal.

 

It is possible to create a Hive table that maps to HBase data; however, query performance in this design will vary. Hive on HBase will shine when a single row or a range of rows is selected, but if your queries tend toward full table scans HBase is very inefficient. Most analytical queries, especially those using group bys, require full table scans.

 

HBase provides the best ability to stream data into Hadoop and make it available for processing in real time. However, balancing the needs of HBase with other cluster processes can be challenging and requires advanced system administration. Additionally, HBase performance is largely dependent on data access patterns and these should be carefully considered before choosing HBase to solve the small file problem.

 

S3DistCp

This solution is only available for users of Amazon EMR. Amazon EMR clusters are designed to be short lived, persisting their data in Amazon S3. Even with Amazon S3, processing a large number of small files still results in launching more map tasks than necessary -- decreasing performance. Enter S3DistCp...

 

S3DistCp is a utility provided by Amazon for distributed copying of data from S3 to ephemeral HDFS or even other S3 buckets. The utility provides the capability to concatenate files together through the use of groupBy and targetSize options. This is useful when you have thousands of small files stored in S3 that you want to process using Amazon EMR. S3DistCp kills two birds with one stone by concatenating many small files and making them appear in faster, ephemeral HDFS storage. There have been reports of as much as 15x performance improvement using this mechanism.

 

For all practical purposes S3DistCp does the same task as the batch file consolidation approach I mentioned in my previous blog. If using Amazon EMR, note that you have a pre-built tool that accomplishes this task.

 

Using a CombineFileInputFormat

The CombineFileInputFormat is an abstract class provided by Hadoop that merges small files at MapReduce read time. The merged files are not persisted to disk. Instead, the process reads multiple files and merges them “on the fly” for consumption by a single map task. You gain the benefits of not launching one map task per file and not requiring that multiple files be merged into a single persisted file as part of a preparatory step. This solves the problem of a MapReduce job launching too many map tasks; however, since the job is still reading in multiple small files random disk IO is still a problem. Additionally, most implementations of the CombineFileInputFormat do not account for data locality and will often pull data from a variety of data nodes over the network.

 

In order to implement this, the CombineFileInputFormat must be extended in Java for different file types. This requires significant development expertise to develop your custom input format classes. However, once written, you can configure a maximum split size and it will merge files until this size is met.

 

Note that since the merged data is not persisted in HDFS, the CombineFileInputFormat does not alleviate the NameNode memory problem. A good example of an implementation of a CombineFileInputFormat may be found here.

 

Hive Configuration Settings

If you notice that Hive creates small files in your Hadoop cluster through “create table as” or “insert overwrite” statements, you can adjust a few Hive specific configuration settings to mitigate. When used, these settings tell Hive to merge any small files that were created into larger files. However, there is a penalty. Hive will launch an additional MapReduce job, post-query, to perform the merge. Further, the merge is done before Hive indicates to the user that the query has finished processing instead of occurring asynchronously.

 

It should be noted that these settings only work for files that are created by Hive. If, for example, you create files outside of Hive using another tool such as Sqoop to copy into the Hive table using a hdfs fs –mv command, Hive will not merge the files. Therefore, this solution does not work when the files ingested into Hadoop are small. This solution is only recommended in Hive-centric architectures where a small performance penalty in insert overwrite and create table as statements is acceptable.

 

The settings to be used are:

 

 

Using Hadoop’s Append Capabilities

I would be remiss if I did not mention this final option. Invariably, when I discuss how to handle small files, I hear the question “I was reading a blog and saw that Hadoop has the ability to append to files. Why can’t we just append to existing files?”

 

The story of appends in Hadoop is rather rocky. Append was added in July of 2008 as part of Hadoop 0.19. However, after implementation (as early as October 2008) many issues were found and append was disabled in 0.19.1. However, in order to support HBase without risk of data loss append capabilities were added back to Hadoop in 0.20.2. So, finally, after 0.20.2 it was technically possible to perform appends in Hadoop.

 

Append may be available, but none of the major tools in the Hadoop ecosystem support it: Flume, Sqoop, Pig, Hive, Spark, and Java MapReduce. MapReduce enforces a rule that the output location of a MapReduce job must not exist prior to execution. Due to this rule it is obviously not possible for MapReduce to append to pre-existing files with its output. Since Sqoop, Pig, and Hive all use MapReduce under the covers it is also not possible for these tools to support append. Flume does not support append largely because it operates under the assumption that after a certain period either in terms of seconds, bytes, number of events, or seconds of inactivity, Flume will close the file and never open it again. The Flume community has deemed this sufficient and not demanded append support.

 

If you truly must use appends in Hadoop, you have to write your own system to perform the ingestion and append to existing files. Additionally, if any of your in-cluster processing requires appending to existing files, you will not be able to use Spark or MapReduce. Therefore using HDFS append capabilities is very complex and should only be used by the most technologically savvy organizations. Without a significant engineering team and support commitment, this option is not recommended.

 

Choosing a Solution

Choosing the best solution for working with small files depends on a variety of questions. It may be necessary to use a combination of these solutions based on access patterns and data requirements. The questions that should be considered include:

• At what point in the data flow are the small files being generated? Are the small files being created at ingestion, or via in-cluster processing?

• What tool is generating the small files? Can changing tool configuration reduce the number of small files?

• What level of technical skill exists within your organization? Do you have the capabilities to maintain input formats or writing your own ingestion engine?

• How often are the small files being generated? How often can small files be merged in order to create large files?

• What sort of data access is required to these small files? Do the files need to accessible through Hive?

• What type of administrative periods might exist where processes can be run inside the cluster to alleviate small files?

• What level of latency is acceptable from MapReduce processes?

 

Once you answer these questions, then you can evaluate and choose the best options.

This post originally published by Chris Deptula on Wednesday, February 18, 2015

 

This is the second in a three part blog on working with small files in Hadoop. In my first blog, I discussed what constitutes a small file and why Hadoop has problems with small files. I defined a small file as any file smaller than 75% of the Hadoop block size, and explained that Hadoop prefers fewer larger files due to NameNode memory usage and MapReduce performance. In this blog, I will discuss solutions to these challenges when small files are truly unavoidable.

 

Solving the NameNode Memory Problem

As discussed in my previous blog the metadata for every block in Hadoop must be stored in the NameNode's memory. This leads to a practical limit on how many objects can be stored in Hadoop and also has implications on startup time and network bandwidth. There are two solutions to this, decrease the number of objects in your Hadoop cluster or somehow enable greater memory use by the NameNode -- but without causing excessive startup times. The most common approaches to solve this memory problem involve Hadoop Archive (HAR) Files and Federated NameNodes.

 

Hadoop Archive Files

Hadoop archive files alleviate the NameNode memory problem by packing many small files into a larger HAR file, similar to TAR files on Linux. This causes the NameNode to retain knowledge of a single HAR file instead of dozens or hundreds of small files. The files within a HAR file can be accessed using the har:// prefix instead of hdfs://. HAR files are created from files that exist in HDFS. Therefore, a HAR file can consolidate both ingested data as well as data created through normal MapReduce processing. HAR files can be used independent of the technology used to create the small files. There is no common dependency other than HDFS.

 

Although HAR files reduce the NameNode memory footprint for many small files, accessing and processing HAR file content will likely be less efficient. HAR files are still stored randomly on disk and reading a file within a HAR requires two index accesses -- one for the NameNode to find the HAR file itself and one to find the location of the small file within the HAR. Reading a file in a HAR may actually be slower than reading the same file stored natively on HDFS. MapReduce jobs compound this performance issue as they will still launch one map task per file within the HAR.

 

In the end, you have a trade-off, HAR files can solve the NameNode memory issue, but may worsen processing performance. If your small files are primarily kept for archival purposes, with infrequent access, then HAR Files are a good solution. If the small files are part of your normal processing flow, you may need to rethink your design.

 

Federated NameNodes

Federated NameNodes allow you to have multiple NameNodes in your cluster each storing a subset of the object metadata. This eliminates the need to store all object metadata on a single machine, providing more scale for memory usage. On the surface, solving the small file memory problem with this technique is attractive, but with a little more thought you'll quickly realize the limitations.

 

Federated NameNodes isolate object metadata -- only one NameNode knows about any particular object. This means to get a file you must know which NameNode to use. If your cluster houses multiple tenants and/or siloed applications, then federated NameNodes are a natural fit -- you can isolate object metadata by tenant or application. However, if you are sharing data across all applications within a cluster, this approach is not ideal.

 

Since federation does not actually change the number of objects or blocks within your cluster, it does not solve the MapReduce performance problem. Conversely, federation adds significant and often unnecessary complexity to your Hadoop installation and administration. Federation, when used to solve the small file problem, is often more of a mechanism to hide the small file problem.

 

Solving the MapReduce Performance Problem

As discussed in my previous blog, the MapReduce Performance problem is caused by combination of random disk IO and launching/managing too many map tasks. The solution seems obvious -- have fewer, larger files or launch fewer map tasks; however, this is often easier said than done. Some of the most common solutions include:

  • Change the ingestion process/interval
  • Batch file consolidation
  • Sequence files
  • HBase
  • S3DistCp (If using Amazon EMR)
  • Using a CombineFileInputFormat
  • Hive configuration settings
  • Using Hadoop's append capabilities

We will discuss the first three of these options in this blog, and cover the remainder in the third and final blog of the series.

 

Change the Ingestion Process/Interval

The easiest way to get rid of small files is simply not to generate them in the first place. If your source system generates thousands of small files that are copied into Hadoop, investigate changing your source system to generate a few large files instead, or possibly concatenating files when ingesting into HDFS. If you are only ingesting 10 MB of data every hour, determine if it's possible to only ingest once a day. You'll create 1x240MB file instead of 24x10MB files. However, you may not have control over the source system creating the files or business needs require that you ingest data at interval frequencies such that small files are unavoidable. If small files are truly unavoidable then other solutions should be considered.

 

Batch File Consolidation

When small files are unavoidable, file consolidation is most common solution. With this option you periodically run a simple, consolidating MapReduce job to read all of the small files in a folder and rewrite them into fewer larger files. If you have 1000 files in a folder, and specify only 5 reduces for the MapReduce job, the 1000 input files will be merged into 5 output files. Followed by some simple HDFS file/folder manipulation, you have reduced your memory footprint by 200:1 and, likely, improved the performance of future MapReduce processing on the same data.

 

This can be implemented in as little as 2 lines of Pig, a load and a store statement. For example, if consolidating text files:

 

 

Implementing this in Hive or Java MapReduce is equally as easy. These MapReduce jobs obviously require cluster resources while executing, and are often scheduled during off hours. However, they should be run frequently enough so the performance impact of small files does not become too extreme. Additional logic is often built into these jobs to only merge files in folders that will have a noticeable performance impact. Merging files in a folder that only contains three files will not result in as great of a performance benefit as merging files in a folder that contains 500 small files.

 

Checking folders to determine which folders should be consolidated can be accomplished in a variety of ways. For example a Pentaho Data Integration job can be used to iterate through a group of folders in HDFS, finding those that meet a minimum set of requirements for consolidation. There is also a pre-written application designed specifically for this task called File Crush, an open source project written by Edward Capriolo. File Crush is not professionally supported, therefore no guarantees exist that it will continue to work with future versions of Hadoop.

 

Batch file consolidation does not maintain the original file names. If having the original file name is important for processing or understanding where the data originated, batch file consolidation will not work. However, most HDFS designs embed naming semantics at the folder level and not within each file. Adopting this practice removes file name dependencies as an issue.

 

Sequence Files

When there is a requirement to maintain the original filename, a very common approach is to use Sequence files. In this solution, the filename is stored as the key in the sequence file and the file contents are stored as the value. The table below gives an example of how the small files would be stored in a sequence file:

 

 

If you have 10,000 small files your sequence file would contain 10,000 keys, one per file. Sequence files support block compression, and are splittable meaning that MapReduce jobs would only launch one map task per 128MB block instead of one map task per small file. This works very well when you need to maintain the input file name, and you are ingesting hundreds or thousands of small files at the same time.

 

However, if you are only ingesting a small number of small files at a time the sequence file does not work as well because Hadoop files are immutable and cannot be appended to. Three 10MB files will result in a 30MB sequence file which is still, by our definition, a small file. Another challenge is that the retrieval of a list of file names within a sequence file requires processing the entire file.

 

Additionally, Hive does not work well with sequence files in this structure. Hive treats all of the data within a value as a single row. It would not be easy to use Hive to query this data as the entire contents of a file would be a single row within Hive. Finally, the Hive tables you create will not have access to the sequence file key, the filename, and will only have access to the value, the contents of the files. It may be possible to write a custom Hive serde to solve these challenges, but that is an advanced topic beyond the native capabilities within Hadoop.

 

More details about challenges and limitations of Sequence Files may be found in our Hadoop file formats blog post.

 

Conclusion

In this blog, we discussed the trade-offs of using Hadoop Archive (HAR) files to minimize NameNode memory usage. We discussed and dismissed the use of Federated NameNodes as a panacea for the small file problem. And, we introduced some common solutions for small file consolidation --- solutions which improve both NameNode memory usage and MapReduce performance. In my third and final blog, I'll discuss additional, less commonly used, but valuable techniques for file consolidation.

 

 

This post published by Chris Deptula on Wednesday, February 11, 2015

 

This is the first in a 3 part blog on working with small files in Hadoop. Hadoop does not work well with lots of small files and instead wants fewer large files. This is probably a statement you have heard before. But, why does Hadoop have a problem with large numbers of small files? And, what exactly does "small" mean? In the first part of this series I will answer these questions. The subsequent parts will discuss options for solving or working around the small file problem.

 

What is a small file?

A small file can be defined as any file that is significantly smaller than the Hadoop block size. The Hadoop block size is usually set to 64,128, or 256 MB, trending toward increasingly larger block sizes. Throughout the rest of this blog when providing examples we will use a 128MB block size. I use the rule if a file is not at least 75% of the block size, it is a small file. However, the small file problem does not just affect small files. If a large number of files in your Hadoop cluster are marginally larger than an increment of your block size you will encounter the same challenges as small files. For example if your block size is 128MB but all of the files you load into Hadoop are 136MB you will have a significant number of small 8MB blocks. The good news is that solving the small block problem is as simple as choosing an appropriate (larger) block size. Solving the small file problem is significantly more complex. Notice I never mentioned number of rows. Although number of rows can impact MapReduce performance, it is much less important than file size when determining how to write files to HDFS.

 

Why do small files occur?

The small file problem is an issue a Pentaho Consulting frequently sees on Hadoop projects. There are a variety of reasons why companies may have small files in Hadoop, including:

 

  • Companies are increasingly hungry for data to be available near real time, causing Hadoop ingestion processes to run every hour/day/week with only, say, 10MB of new data generated per period.
  • The source system generates thousands of small files which are copied directly into Hadoop without modification.
  • The configuration of MapReduce jobs using more than the necessary number of reducers, each outputting its own file. Along the same lines, if there is a skew in the data that causes the majority of the data to go to one reducer, then the remaining reducers will process very little data and produce small output files.

 

Why does Hadoop have a small file problem?

There are two primary reasons Hadoop has a small file problem: NameNode memory management and MapReduce performance. The namenode memory problem Every directory, file, and block in Hadoop is represented as an object in memory on the NameNode. As a rule of thumb, each object requires 150 bytes of memory. If you have 20 million files each requiring 1 block, your NameNode needs 6GB of memory. This is obviously quite doable, but as you scale up you eventually reach a practical limit on how many files (blocks) your NameNode can handle. A billion files will require 300GB of memory and that is assuming every file is in the same folder! Let's consider the impact of a 300GB NameNode memory requirement...

  • When a NameNode restarts, it must read the metadata of every file from a cache on local disk. This means reading 300GB of data from disk -- likely causing quite a delay in startup time.
  • In normal operation, the NameNode must constantly track and check where every block of data is stored in the cluster. This is done by listening for data nodes to report on all of their blocks of data. The more blocks a data node must report, the more network bandwidth it will consume. Even with high-speed interconnects between the nodes, simple block reporting at this scale could become disruptive.

 

The optimization is clear. If you can reduce the number of small files on your cluster, you can reduce the NameNode memory footprint, startup time and network impact.

 

The MapReduce performance problem

Having a large number of small files will degrade the performance of MapReduce processing whether it be Hive, Pig, Cascading, Pentaho MapReduce, or Java MapReduce. The first reason is that a large number of small files means a large amount of random disk IO. Disk IO is often one of the biggest limiting factors in MapReduce performance. One large sequential read will always outperform reading the same amount of data via several random reads. If you can store your data in fewer, larger blocks, the performance impact of disk IO is mitigated.

 

The second reason for performance degradation is a bit more complicated, requiring an understanding of how MapReduce processes files and schedules resources. I will use MapReduce version 1 terminology in this explanation as it is easier to explain than with Yarn, but the same concepts apply for Yarn. When a MapReduce job launches, it schedules one map task per block of data being processed. Each file stored in Hadoop is at least one block. If you have 10,000 files each containing 10 MB of data, a MapReduce job will schedule 10,000 map tasks. Usually Hadoop is configured so that each map task runs in its own JVM. Continuing our example, you will have the overhead of spinning up and tearing down 10,000 JVMs!

 

Your Hadoop cluster only has so many resources. In MapReduce v1, to avoid overloading your nodes, you specify the maximum number of concurrent mappers a node can process. Often the maximum number of concurrent mappers is in the 5 to 20 range. Therefore, to run 10,000 mappers concurrently you would have to have 500 to 2000 nodes. Most Hadoop clusters are much smaller than this, causing the JobTracker to queue map tasks as they wait for open slots. If you have a 20 node cluster with a total of 100 slots, your queue will become quite large and your process will take a long time. And don't forget, your job is likely not the only job competing for cluster resources.

 

If instead of 10,000 10MB files you had 800 128 MB files you would only need 800 map tasks. This would require an order of magnitude less JVM maintenance time and will result in better disk IO. Even though an individual map task processing 128 MB will take longer than a map task processing 10 MB, the sum of all of the processing time will almost always be orders of magnitude faster when processing the 800 larger files.

 

What can you do if you have small files?

Now that we have discussed what constitutes a small file and why Hadoop prefers larger files, how do you avoid the small file problem? In my next post, I will discuss solutions to the NameNode memory problem as well as some initial options for solving the MapReduce performance problem. In my third and final blog in this series, I will discuss additional solutions for the performance problem and how to choose the best solution for your situation.

This post was written by Dave Reinke and originally published on Wednesday, June 22, 2016

 

In a previous blog, we discussed the importance of tuning data lookups within Pentaho Data Integration (PDI) transformations.  We suggested that there were identifiable design patterns which can be exploited to improve the performance and stability of our code.  In this blog, we will investigate the most frequently used lookup pattern:  Key-based, Single Record Lookup.

 

This pattern’s classic use case involves an inbound stream of fact records queried from a source containing dimensional attributes in the form of business keys.  In order to load this data into the target fact table, the dimensional business keys must be converted into a set of technical keys referencing target dimension tables. The conversion of dimensional business to technical keys is done through a series of lookups.

 

The Database Lookup and Stream Lookup steps are used to implement this pattern.   Database Join could be used, but would be much less efficient since it does not support caching.   Additionally, we are not considering PDI’s purpose built Dimension Lookup/Update and Combination Lookup/Update steps as this pattern assumes no versioning of dimension data, i.e. no slowly-changing dimensions and that we have already loaded our dimension tables prior to the fact table load.  (I can already hear some of my colleagues’ arguments.   Yes, I know that Dimension Lookup/Update can be used for Type 1 dimensions and that its use enables the code to more easily migrate to Type 2, versioned, dimensions.  It is a candidate for this pattern, but requires specific database design changes for its use.   We’ll discuss Dimension Lookup/Update in a future blog.  I promise.)  So with that caveat, let’s examine the use of Database Lookup.

Using Database Lookup for Key-based, Single Record Lookups

Database Lookup is the prime choice for this pattern because it is easy to set up and provides rich functionality and tuning options.  However, there are two vital prerequisites for its use.  First, the lookup source must be a single database table.  Second, the lookup table must contain a unique key column, often a business key, and a technical key column, almost always the primary key. The following diagram shows an example of this step’s configuration.  It depicts a lookup of the technical product_key from the dim_product table in my_datamart database using a unique product_code business key field.

 

As configured, the step will generate and issue the following SQL statement to the my_datamart database:

select product_key from dim_product where product_code = ?

 

The ? is replaced by the value in the inbound stream’s product_code field.  

 

Since the step is configured to enable caching of lookup data, the SQL statement will be run only once for each distinct product_code value found within the inbound stream.  Additionally, we provide a default value of 0 should the lookup fail to find a match.  No-Match behavior and caching are both configurable.  Let’s discuss...

Handling No-Match Lookup Failure in the Database Lookup Step

In our example, a no-match would occur if an inbound stream record’s product_code did not find a matching product_code in the dim_product database table.   The step is configured to “not fail”, rather to place a default value of 0 in the outbound stream’s product_key field.  The underlying assumption is that there exists a  “null” or “unknown” dim_product database record with a 0 product_key for null or invalid product_code’s.  The following table describes the two ways a “no-match” can be handled by this step.

 

Action

Step Configuration

Processing Implications

Provide a Default Value

Depicted in the example.

A subsequent step in the transformation can be used to filter and/or operate on the defaulted value, perhaps writing an error record and sending an email to an administrator.

Filter the inbound record

Check the "Do not pass the row if the lookup fails" box

No error is raised and the row is not passed to the outbound stream. This prevents you from taking corrective action on the errant data but ensures that only valid matches are processed.

In practice, we rarely use the filter option as most data programs attempt to remediate and/or alert if invalid data appears.

 

Optimizing the Lookup Cache for the Database Lookup Step

Not implementing or optimizing the lookup cache is a frequent mistake made by new PDI developers. Fortunately, it’s simple to explain and set up!  The following table details the configuration options:

Cache Option

Step Configuration

Processing Implications

Cache All

Check “Enable Cache”, then check “Load all data from table”

A single SQL select statement is executed, retrieving all key and lookup column data from the database Lookup Table. The entire Lookup Table dataset must fit in PDI’s memory. No inbound stream records will be processed until this entire dataset is retrieved and cached.

Cache Some

Check “Enable Cache”, uncheck “Load all data from table”, then specify how big (in # of lookup records) the cache will be allowed to get via the “Cache Size in rows” field.

One SQL select statement will be executed per distinct inbound record key that is not already found in the cache. PDI memory must be able to hold the number of lookup rows specified.

No Cache

Uncheck “Enable Cache”

Every inbound record will cause a SQL select statement to be executed against the Lookup Table -- even if keys are repeated!

 

So, when to use which option?   Here are some insights into our best practice decision-making...

Cache Option

When to Use

Cache All

  • The lookup table is not very big

Cache Some

  • The lookup table is very big, the inbound stream is not small and contains repeated lookup keys

  • The lookup table is big and the inbound stream is sorted on lookup keys (i.e. removing previously cached records from the active cache has no downstream penalty)

No Cache

  • The lookup table is very big and the inbound stream is very small

  • The lookup table is very big and the inbound stream contains very few (or no) repeated lookup keys

 

The most common configuration is Cache All. In general, if Cache All works efficiently, then use it. Most lookup tables are small (< 100,000 records) and thus can be retrieved quickly and will easily fit in memory. If full lookup dataset retrieval is inordinately slow or memory is limited, then consider Cache Some or No Cache depending on the size and lookup key distribution in your inbound stream.

Using Stream Lookup for Key-based, Single Record Lookups

Stream Lookup differs from Database Lookup in the following manner:

  • It’s lookup dataset is sourced from a second inbound stream of records and not tied directly to a single database table.

  • It always caches all lookup data -- there is no option for partial or no caching. 

Thus, Stream Lookup should be used in the following circumstances:

  • The lookup data is not sourced from the entirety of a single lookup table

    • You may want to use a subset of rows in a lookup table. In this case, you’d use a Table Input step to gather the subset and then stream that subset as the lookup data into the Stream Lookup step, or

    • the lookup data is sourced by joining two or more tables or streams.

  • You have multiple lookups against the same “not big” lookup table

    • You have role-playing dimension keys that relate a single fact record to the same dimension table multiple times. In this case, you’d use a Table Input step to query all of the lookup data and then use “copy” hops to stream that data to multiple Stream Lookup steps -- one per role.

  • You have a source of data that does not reside in a database table

    • E.g. a flat file, or a spreadsheet

 

Since the lookup data stream for Stream Lookup can be generated by any valid sequence of PDI steps, the alternatives and examples for its use are somewhat boundless.

 

Also note that Stream Lookup does not provide a filter record option for “no-match” conditions.   If you want to rid your stream of “no-match” records you have to use a Filter step after the Stream Lookup.

 

As you can see, there is tremendous opportunity to tune (or mistune!) lookups when doing a simple key-based, single record lookup.   Next in our series, I’ll examine a similar pattern but with a subtle twist:  Looking up the Most Recent Record.

This post was written by Dave Reinke and originally published on Wednesday, July 6, 2016

 

As we continue our series of Pentaho Data Integration (PDI) Lookup Patterns, we next discuss best practice options for looking up the “most recent record”. Common use cases for this pattern include finding the most recent order for a customer, the last interaction of a web site visitor, and the last claim record for a policy. As you’d expect there are multiple ways in which this can be done within PDI.

 

We’ll explore two of the most frequently used options, but first let’s define an example use case.

Example Use Case

Suppose we have a stream of customers each identified with a customer_key and there is an orders table which contains the following columns:

 

COLUMN

DESCRIPTION

order_key

Primary key of the order. The column we want to lookup.

customer_key

Foreign key to customers. The column we need to match with our input stream records..

order_timestamp

The timestamp of the order.  We’ll use this to determine the most recent record for a given customer_key.

Other columns

There may be many other columns which we could include in our lookup.   We will ignore these in order to de-clutter our example.

 

If you had customer_key, 100, and wanted to get the most recent order_key, you would issue a query like the following:

 

select order_key

from orders

where customer_key = 100

order by order_timestamp desc

limit 1

 

The database would return exactly one order_key (limit 1) for the given customer_key (100) chosen as the first of a list sorted by the order_timestamp descending. In PDI, we can use the Database Lookup and Database Join steps to accomplish the same result.

Approach 1: Database Lookup

As previously discussed, the Database Lookup step is most often used for key-based, single record lookups, however it is also apt for the Most Recent Record lookup pattern. The following picture demonstrates how this step would be configured for our example.

 

 

The step looks up the order_key of the most recent record in the order_db.orders table for each input stream record’s customer_key. The Order by field tells PDI to sort the matching orders records by order_timestamp in descending order. By design, the step will only return one row -- the first row returned after the sort.   So, we have effectively configured the step to execute a query similar to the one listed above.

 

As configured, our example will execute a single lookup query for each input stream record. If customer_key’s are repeated in the input stream, this will result in many redundant query executions. To eliminate these redundant queries, caching can be configured for Database Lookup. Database Lookup enables caching because it restricts the lookup query to a single database table. The performance boost can be quite significant. In our example, suppose there are 1,000,000 input stream records containing 55,000 distinct customer_key’s. Without caching, the step will execute 1,000,000 lookup  queries. If we check the Enable Cache? box, the step will only fire 55,000 lookup queries -- one per distinct customer_key. This results in an order of magnitude or more in improved performance. 

Approach 2: Database Join

The Database Join step can also be configured to implement the Most Recent Record lookup pattern. The following picture shows it’s configuration for our orders example.

 

 

When using the Database Join step, you must supply your own lookup SQL. In our simple example, this is a straightforward parameterized select statement that returns a order_keys for a given customer_key in descending timestamp order -- most recent record first. We enter 1 in the Number of rows to return field to ensure that only the first record for each input stream record is passed to the output stream. We check Outer join? to allow a failed lookup record (i.e. no matching orders for a customer_key) to pass with a null order_key in it’s output stream record.

 

Performance-wise, the Database Join step will take about as long to process as a non-cached Database Lookup step. There will be one query per input stream record. Unlike Database Lookup, there is no ability to cache data to improve performance. However, because Database Join allows you to define your own SQL Select statement, you have the ability to define arbitrarily complex lookup queries.

Which Approach to Use?

So, how do you decide which step to choose? The answer is rather straightforward. Use Database Lookup step if you have a single lookup table which a straightforward lookup condition. Leverage caching if your input datastream contains duplicate lookup keys. Only use Database Join if your lookup query is complex. Remarkably, in the field, we see developers using to Database Join without realizing that Database Lookup can be leveraged to improve performance. Don’t make that mistake!

 

One final comment, the savvy reader will note that this same pattern applies to “least recent record” or historically first record lookups as well. In fact, any arbitrary record sort that you might apply can be handled by this design pattern.

Next in our series, I’ll examine a the Master-Detail explosion pattern causing the Database Join step to come to the fore. Visit our website in the next week or so, or follow us on Twitter @inquidia or Facebook to tune in to the rest of this series.

This post was written by Chris Deptula and originally published on Wednesday, January 28, 2015

 

With an immutable file system and no update command, how do you perform updates in Hadoop?   This problem occurs in just about every Hadoop project.   Yes, the most common use case of Hadoop data ingestion is to append new sets of event-based and/or sub-transactional data.   However, for analytics, you often have to join these events with reference data (hereafter, I will refer to as “dimension data”) -- data that can and usually does change over time.

 

Imagine you are collecting website activities through an intelligent event tracking system.  The events are generated as JSON datums.  You’ll likely have some automated collection mechanism that uses streaming or micro-batching ingestion into HDFS.  Simple enough, just append the new data.   Now your business users want to analyze these events by various web user attributes -- attributes that you maintain in your registration systems.   You often update registration attributes via third party data append services and from direct updates by the registrants.  You need a way to maintain a current (perhaps historically versioned) copy of the registrant data in HDFS.  You need updates in Hadoop.

 

Performing data updates in Hadoop is hard.  On a recent project and after a long day of Pig scripting, one of my colleagues said, “My head hurts.  I have to think about things I never had to think about when using databases.”   His experience maintaining dimension data using mature ETL tools and RDBMS technology was great, but now that had to translate to the data programming capabilities in Hadoop.   Comparatively speaking, data programming in Hadoop is, how do we put this nicely, still maturing.

 

If you want to implement “upsert” (i.e. merge) logic or (gasp) Type 2 dimension logic, you have to build it yourself.   HDFS files are immutable, meaning you cannot edit a file after it has been written. Hive very recently added, and Pig does not (yet) provide an update command, much less a merge statement. Some day they will, but for now you must develop your own algorithms.   Fortunately, there are some patterns which can be applied.

 

In this blog, we’ll discuss four common approaches:  Hybrid Update, HBase Dimension Store, Merge and Compact Update, and “Good Enough” Update.  There is a fifth less common approach involving the creation of a custom access layer that resolves updates at read time.  This is often reserved extremely high volume data where the batch processing cost to transform all of the data does not offset the cost of transforming the data on-demand.  We won’t cover this last approach.

 

Hybrid Update

The Hybrid Update approach uses mature ETL and SQL programming to maintain dimensional data within an RDBMS then periodically copies the data, likely using Sqoop, into HDFS replacing the previous version.  All of the hard work to handle special update rules, slowly changing dimensions (i.e. versioning), surrogate key assignment, etc. can be maintained using highly productive ETL programming languages.   The Hadoop “update” becomes a file replacement.   Given that most companies have already invested in a data warehouse to maintain dimensional data, this approach is easiest to implement and often the first adopted.

 

However, there are downsides.  First, the approach is not scalable.  Suppose you have a billion record dimension table.  With this approach you’d extract a billion records from your RDBMS each time you need to update the dimension data in Hadoop.  There would clearly be performance and capacity implications!  Even if you currently have manageably sized dimensions, as data volume grows, your load performance will degrade. 

 

Second, this approach requires orchestration between processes running in different environments.   You have to make sure that your RDBMS ETL completes before your extract process starts.   The latency requirements for your Hadoop-based analysis are wholly dependent upon the timeliness and success of your RDBMS load processing.  You could use an ETL technology like Pentaho Data Integration to orchestrate, but your load windows may shrink and exception handling could be complex.

 

In the end, if all of your dimensions are relatively small with slow growth, then this approach may be a good fit.

Otherwise, we’d only recommend it as a transitional step before embarking on one of the following Hadoop-centric algorithms.

 

HBase Dimension Store

HBase is a NoSQL database that runs on Hadoop.  Since it provides native support for updates, it is often considered for dimension data maintenance.  In fact, many ETL tools contain pre-built steps to stream inserts, update and deletes into HBase.  You also can ingest “real-time” streams of high velocity data into HBase, minimizing update latency.  And, HBase tables can be queried via SQL on Hadoop options like Hive and Impala with properly configured serdes.   All this makes HBase seductively attractive, but, be warned, there are significant performance challenges when using HBase tables for analytical queries.

 

Querying Hbase data works best when a single row or a range of rows is selected.   If the query must scan the entire table (as is often the case for analytical queries), HBase is quite inefficient.   In our labs, we set up a small experiment using Impala to contrast performance between a file-based dimension table versus an HBase version.  We created several aggregating “group by” queries that joined our dimension tables to a large, file-based fact table.  In all cases, the file-based dimension table outperformed the HBase version.

 

If you are 1) already using HBase, 2) comfortable maintaining and balancing its resource needs with the other services running on your Hadoop cluster and 3) able to ensure that your HBase queries always return a single row or a range of rows, then an HBase dimension store is a good solution to the update problem.  Otherwise, we’d recommend you consider one of the next two alternatives.

 

Merge and Compact Update

Unlike the Hybrid and HBase approaches this update strategy leverages an algorithm rather than a specific technology. The algorithm can be implemented with Java MapReduce, Pig, Hive, Spark, Cascading, Pentaho Visual MapReduce or any other Hadoop data programming language.  It consists of the following steps:

 

1. Maintain a Current Copy of the Master Data:  A complete copy of your dimension data must reside in HDFS.  For clarity’s sake we’ll call this the “master” data.  This requires a one-time, initial load of the data.  Check out the Inquidia Hadoop Datamart Importer (HDI) for an automated way to do this.

 

2. Load the Delta Data: Load the newly updated data into HDFS.  We’ll call this your “delta” data.

 

3. Merge the data:  Join the master and delta data together on the business key field(s).

 

4. Compact the data: After the merge you will have one or more records for each business key.  In the compaction, you apply logic that emits only one or more records per business key.  In the simplest case, you will emit one record having the maximum update timestamp.   However, if you need to maintain record versions (e.g. type 2 dimensions) then your logic will emit a set of versioned records with non-overlapping effectivity dates.  There may be additional complexity in your compaction process if you have to apply special business logic.  For example, you may never want to update some fields or only update a field when some condition occurs.

 

5. Write the Data to a Temporary Output: Since most Hadoop jobs (e.g. MapReduce) cannot overwrite an existing directory, you must write the compaction results to a temporary output.  The temporary output is your new dimension table containing a merge of new, updated and unchanged records.

 

6. Overwrite the Original Master Data: Move the temporary output to the location of the original master data, overwriting the previous version.

 

This Hortonworks blog illustrates a simple example of this approach using Hive.  It works when there are no complex business rules for the compaction.   If you have business rules that cannot be designed using SQL clauses, you may be limited in Hive.

 

The primary downside of this approach is that you must read and process the entire dimension table every time you perform an update -- even if only one record is changed.  If your dimension table is 2TB on disk, then you are reading and writing 2TB to perform the update.  Fortunately, there is a way around this issue...

 

“Good Enough” Update

If you have extremely large dimensions that cannot be processed within your batch window, then you either have to buy a bigger cluster or come up with another approach.  This is where the “good enough” update is applied.

 

At its heart is an intelligent partitioning strategy whereby you organize your dimension data so that you isolate records which are most likely to be updated into a small subset of the partitions.  This is often done using a record creation or last update date and assuming that recently inserted records are more likely to be updated than older records.  (For type 2 dimensions, you would also isolate the “current” version of each record since older versions are considered immutable.)  Within HDFS, you’d store each partition in a sub-folder of a parent dimension folder.   A Hive table can be defined to recognize these partitions, which might also improve query performance.

 

With this partitioning strategy in place, you simply perform the Merge and Compact Update on the subset of data considered most likely to change.   You avoid processing the entire dataset with the risk of inserting redundant dimension records.  The redundant dimension records are created when an updated delta record cannot find its previous version in the partition subset.

 

Of course, the potential to have more than one dimension record for the same dimension key means that query results which include that key might be wrong.   This may persist and potentially compound unless you augment this approach with a periodic merge and compact update on the complete dataset.   We have seen this strategy applied where good enough updates are executed throughout the week and then a full update is run over the weekend.  We have also seen organizations run data quality queries looking for duplicate keys and only running merge and compact jobs if an issue is detected.   In each case, there is a period of potential data inaccuracy which eventually is resolved.

 

The Good Enough Update approach should only be used when you cannot feasibly execute the full Merge and Compact Update and when there is a tolerance for small data errors.    Given the vagaries of event/sub-transactional data collection, most Hadoop-based analytical use cases already have such a tolerance.

 

Picking an Update Strategy

As you’d expect, one size does not fit all.   You will likely utilize more than one Hadoop update approach.   So, how do you decide which approach is right for which circumstance?    There are four major (and a whole host of minor) areas for consideration.   First, what is the size of your dimension? Are you dealing with 100’s, 1000’s, … 1,000,000’s of records?   Second, what is your latency requirement?  Do you have a batch window or do you need to see updates occur as changes “stream” in?   Third, how much data volatility exists?   Is this dimension fairly stable with new/changed records trickling in or do you see massive “delta” files with nearly every record being touched?   Fourth, how complex are your update rules?  Is it as simple as overriding the previous record with a delta record or are you implementing type 2 dimension logic with additional update rule dependencies?   The following table considers these factors for each algorithm.

 

 

In general, Hybrid is useful for small dimensions that have complex update rules.  HBase is best when latency is a major factor and update rules are simple.   Merge and Compact is the most versatile and often used but has scalability and latency limits.  Good Enough helps solve the Merge and Compact issues but at the cost of potentially introducing data quality issues.

 

The Future

Hortonworks has added batch insert, update, and delete capabilities in Hive as part of the Stinger.next initiative.  These capabilities are designed for large scale inserts, updates, and deletes as opposed to single record transactions.  Too many transactions over a period of time can lead to a drop in performance.  Additionally, ACID transactions in Hive currently only work with the ORC file format and is a very new immature feature.  As this capability matures we expect this to become an attractive option and widely used capability.

 

Save the date: Pentaho User Meeting 2018 for the DACH region (Germany, Austria, Switzerland) will take place on March 6, 2018!

 

Expect a full day of talks, networking and exchanging ideas starting at 10am and ending at 6pm.

 

German Federal Police, CERN and Netfonds will share their use cases with us and Pedro, Jens and Matt will present hot stuff from Pentaho 8.0.

 

Call for Papers

We are looking for users to present their projects! Send your proposal to Stefan at stefan.mueller@it-novum.com

Talks are 30 long including answers and questions, languages are German and English.

 

Agenda

Agenda and more details can be found on the event page

 

Looking forward to see you in Frankfurt on March 6!

"IoT Hello World"

 

 

 

It was brought to my attention at PCM2017 in November, that I still owe the third blog on using the MQTT plugin for Pentaho Data Integration (PDI). So, thanks for reminding me. For those that haven’t been keeping track, there were two other blogs written earlier this year introducing the MQTT plugin at Pentaho + MQTT = IoT and applying some security techniques to MQTT in Securing IoT Data with Pentaho’s MQTT Plugin. So, to close out this 3 blog series, let’s kick it up a bit. I would like to describe a small project I’ve been working on that uses PDI and MQTT in a bi-directional communication configuration. This means using both the MQTT Publisher and MQTT Subscriber steps within the same transformation. I have tweeted snippets of this project throughout the year as I developed it, and now I’ll explain how it works. Refer to the diagram in Diagram-1 which shows the full architecture of this project, which I call “IoT Hello World”. I use this project for demonstrations, education and as a way to generate streaming IoT data for participants in the #hitachicode1 hack-a-thon events.

 

ArchitectureDiagram.png

Diagram-1: The overall architecture of project "Pentaho - IoT Hello World"

 

Project Overview

 

A quick description of what this project does before I explain how it works. This is a 6 degree of freedom (DoF) robotic arm that performs several robotic routines - Displaying "Hello" & "World", then picking up and placing a car on the track (refer to the video above). While looping through these routines, components of the robotic arm, the servo motors, will begin to heat up. Temperature sensors have been attached to all 6 servo motors so that the temperature of the servo motors can be sensed and reported. The data stream is published in comma separated variables (csv) format, that is generated from this data includes,

 

  • Robotic Arm Controller (RAC) serial number (read from a configuration file)
  • A RAC system timestamp
  • The "Shoulder" servo motor temperature
  • The "Twist" servo motor temperature
  • The "Wrist" servo motor temperature
  • The "Elbow" servo motor temperature
  • The "Base" servo motor temperature
  • The "Claw" servo motor temperature
  • An MD5 hash of first 8 fields of this message
  • The MQTT Topic that this message is published under

 

An example MQTT message looks like this and is published under the topic "Pen/Robot/Temp",

 

SN84112-2,2017-07-26 13:57:28,26.0,25.0,25.0,27.0,24.0,24.0,6c18c1515dea2c5ddfcc6c69a18cbedf,Pen/Robot/Temp

 

There is a another message that is published by the RAC during startup. When the RAC is booted and the "Start" button is pushed, a registration message is published. This message "registers" the RAC with the Corporate Server Application which is stored in the device registration table. The message consists of the following fields in csv format,

 

  • RAC serial number (read from a configuration file)
  • A RAC description (read from a configuration file)
  • A RAC system timestamp
  • IP address of the RAC
  • The node name of the RAC
  • The PDI software version
  • Latitude
  • Longitude

 

An example of the device registration MQTT message looks like this and is published under the topic "Pen/Robot/DeviceRegister",

 

SN84112-2,SN84112-2 Robot Arm Hello World 6 DOF 6 Servos,2017-07-26 13:33:50.138,192.168.1.184,ironman,
6.1.0.1-196,30.6291065,-100.995013667

 

Again, refer to Diagram-1. The four main sections of this project, with the fourth component being optional, of this project are,

 

  1. The “Corporate Server Application” subscribes and publishes to several message queues that,
    1. Subscribes to the "Device Registration" message queue to receive a one time registration information message when the RAC comes online
    2. Subscribes to the "Device Data Stream" message queue for the operational information from the RAC
    3. Publishes the response to the "Corporate Response" messages queue about corporate operating temperature specifications
    4. Publishes a message stream to the "Mobile Stream" message queue for a mobile application to monitor the robot arm remotely
  2. The Robot Controller Application and Sensor Monitoring which registers the RAC with the Corporate Server, collects sensor data and other controller information, and manages all the control buttons, LED indicators and the robotic arm itself.
  3. The MQTT Broker, which is a free MQTT Broker at iot.eclispe.org (but any MQTT Broker can be used) for hosting the four message queues used in this system. The four message queues used are defined as,
    1. Device Data Stream – used for the device data stream
    2. Device Registration – used for the device registration message
    3. Corp Response – used for the corporate server app to send messages back to the robot arm controller.
    4. Mobile Stream – a dedicated message stream for the mobile device app from the corporate server app
  4. A fourth component is a mobile app for remote monitoring. This is not required and the system will run with or without the mobile device present.

 

As mentioned earlier, what’s happening with this system is while the robot arm is running its routines, the servo motors will begin to heat up. The sensors attached to each servo motor are read continuously and a status LED associated with the temperature readings provide a local visual status. This is called the “Local Vendor Temperature Status” and it covers a temperature range that corresponds to Green (OK), Yellow (WARNING) and Red (OVERHEAT) LEDs. The other set of status LEDs are called the “Corporate Temperature Specifications”. The Corporate Server Application is subscribing to the RAC’s data stream in real-time and responding based on a different definition of Green (OK), Yellow (WARNING) and Red (OVERHEAT) conditions. Basically, the corporate temperature specifications are lower than the vendor temperature specifications.

 

I called this the “fishing boat situation”. If you've ever ask a fishing boat captain if the boat will go faster, they will respond with “...yes, the vessel is designed to go faster, but by running the engines at half speed, the engines will last twice as long”. This is the same situation here. The robot arm vendor will say that the robot can run at a higher temperature, but the owner of the robot arm (Corporation ABC) wants to operate the robot arm at a lower temperature, or in this case, indicate that the robot arm is operating at a higher temperature than they want it to. This, of course, is just a simulated scenario in order to tell the story of what’s going on.

 

 

The Robot Arm Controller

RobotArmController.png

The robot, robotic arm controller (RAC) and monitoring application is a collection of orchestrated PDI transformations that execute through a job that all run on a Raspberry Pi. These PDI components do the following,

 

  1. Registers the robot with the current GPS location using a GPS module installed on the RPi, various system information, the device’s serial number and device description. This registration information is formatted and published to the “Device Registration” message queue.
  2. The Data Collection component monitors the 6 RobotArmUI.pngtemperature sensors installed on the RAC's servo motors and are connected the RPi. This component publishes the formatted message to the “Device Data Stream” message queue.
  3. Subscribes to the “Corp Response” message queue to receive the current corporate operating temperature specifications.
  4. PDI also orchestrates a suite of python programs that are used to do the physical IO associated with the RAC like,
    1. Turn LEDs indicators on and off
    2. Collect temperature readings from the temperature sensors
    3. Interface with the control buttons
    4. Collect the latitude and longitude data from the GPS module

 

SomePythonCode.png

All the physical general purpose IO (GPIO) connections for LEDs, sensors, GPS coordinates, and robotic arm manipulation and control are performed through various python programs executed from PDI. An example python program looks like this for reading the 6 temperature sensors and outputting a list of temperatures. I picked this code sample to show you because it reads all the sensor in parallel versus sequentially. Each sensor takes several seconds to read and the time adds up if read in sequence.

 

 

The Corporate Server

 

The Corporate Server plays the role of maintaining the company’s operational specification on preferred temperature levels versus the vendor's operating specification. Let’s take a closer look at the transformation running the Corporate Server. In the transformation shown below, see Transformation-1, we can see that this transformation uses both the MQTT Subscriber and MQTT Publisher step simultaneously within the same transformation.

 

From left to right, the transformation subscribes to the topic “Pen/Robot/Temp” at the MQTT Broker. We then copies the message stream into four threads.

 

  1. This thread goes to another transformation that builds another message and publishes it to the "Mobile Device" message queue.
  2. This thread splits the message into fields and updates a table in the database (the commit for this database is set to 1 so every record is immediately committed).
  3. This thread also gets a copy of the message and splits out the temperature values and does some calculations on the temperatures from the RAC's temperature sensors. This value is then used in the Switch/Case step to compare it to a table of corporate define temperature values. Base on the result, a MQTT message is published to the “Corp Response” queue on the MQTT Broker with a Green (OK), Yellow (WARNING) or Red (OVERHEAT) status. This status is then used by the RAC to light the corporate LED indicators. This is what I’m referring to as bi-directional communication or full-duplex communication using MQTT. The transformation is subscribing to a topic and receiving a message, then within the same transformation publishing a message.
  4. This thread is just dumping the raw messages straight into a log file locally for prosperity purposes.

 

CorpServerTransformation.png

Transformation-1: The main transformation for the Corporate Server Application.

 

There is a second (floating) transformation within this main transformation for catching the Device Registration information when the RAC is started. This should work when there are multiple controllers using this system. In fact, the mobile device stream is a join of the Device Registration table (to get the GPS latitude and longitude, the robot arm controller description information, and the information from the table storing the temperature readings already stored in the database.

 

That's it! The entire process on the RAC is set up when the Raspberry Pi boots up. The push button interface kicks of scripts that launches kitchen to start the RAC application. The Corporate Server Application is started with a script to start pan.

 

RACandPhone.pngThis is a great project that has great IoT demonstration value, plus it was just a blast to do. It is also portable so I can travel with it as necessary. I have started on a second version of this robot concept with many more sensors, more complicated routines and other updated components. I looking forward to showing that project some day soon.

 

Let me know if you have any questions or comments. I know this project is less industrial and more hobby-ish in nature, but it is a great tool for demonstrating complex concepts with Pentaho and generating real IoT data streams.

Back when I took the Johns Hopkins Data Science track on Coursera, one of my homework assignments for the Developing Data Products course was to create a dynamic tool using R and Shiny that would graphically demonstrate the Central Limit Theorem (CLT).  The CLT says that if you take the means of groups of random numbers, then the means will be normally distributed no matter what the underlying distribution of random numbers looks like.  The finished assignment included a histogram that reacted dynamically to user input including the number of random numbers to generate, how many per group, and so on.  The Shiny application would detect user input and dynamically update the histograms in R.

 

A couple of weeks ago, I wrote about implementing histograms using PDI and CTools: Creating CDE Histograms using PDI and a User Defined Java Class.  If we want a CTools histogram to be dynamic in the sense of my R/Shiny homework assignment, then we need to send parameters from the Ctools dashboard to the kettle transformation.  Fortunately this is easy to do.  The following example was produced using the Pentaho BA Server 7.1, and the complete source is attached.  To see it work, simply upload the attached CentralLimitTheorem zip file to an empty folder in the BA Server.

 

The first step is to rework the histogram kettle transform example slightly from last week's example.  To demonstrate the CLT, we'll use only the exponentially distributed random numbers, and so we'll remove the rest.  Next, we'll want to add  group structure to the random number generation.  We can do this as shown in Figure 1 by adding an incrementing row id using a Sequence step, adding a group number (= row id modulo number of rows in a group) using a Calculator step, and finally emitting a group average using the Group By step.  Also we use a Get Variables step and an Add Constants step to define histogram dimensions from user input instead of a Data Grid.

Figure 1: Taking average over groups of rows using Sequence, Calculator, and Group By steps.

 

Next, we want to create a dashboard that uses this transformation.  The basics of how to do this are described in my blog post fromlast week, but briefly, the new kettle transformation is uploaded through the Pentaho User Console, a new CDE dashboard is created, and a kettleTransFromFile datasource is created pointing to the new kettle transform.  The major addition here is that in the current example, we want CTools to pass user input back to the kettleTransform and update the histogram in response.  To do this, we need

  1. to configure the datasource to accept parameters and map them to kettle transformation input parameters,
  2. to create user changeable parameters in the dashboard,
  3. to have the chart to update when the user parameters change

There are several places where parameters have to be defined, and these are demonstrated in the following.

 

Kettle Datasource Configuration with Parameters

 

Lets consider the kettleTransFromFile datasource.  In the example, the transformation is called t_Central_Limit_Theorem.ktr and the output step with the histogram is "out: 1".  IMPORTANT: Make sure Cache is set to false, since the chart is to be interactive.  The datasource here is describing a CDA datasource with its own parameters and defaults, and these need to be mapped to the kettle transformation input parameters.  This is done by clicking on the "Variables" item in the datasource properties (See Figure 2).  The entries on the left under the "Arg" title are the CDA parameters.  These are the parameters that that the chart components use when they pull data down from a datasource, and the entries on the left under the "Value" title are the corresponding kettle input parameters defined in the transformation.

Figure 2: The datasource Variables dialog in CDE.

 

If an entry appears in the Variables table, then a value can be passed from the dashboard to the kettle transform.  If an entry does not appear in this table, then the dashboard cannot see that kettle input parameter, and the default value defined in the kettle transform will be used.

 

Next, the data type and default value need to be specified for each CDA parameter.  This is done by clicking on the "Parameters" item in the datasource configuration list (See Figure 3).

Figure 3: The datasource Parameters dialog in CDE.

 

Note that these are "CDA" parameters and defaults - you must use the same name here that you used in the "Arg" column of the Variables configuration table. And the default value given here is what will be passed to the CDQ query (and ultimately to the kettle transformation) if the dashboard does not specify it.

 

Creating Dashboard Parameters and Inputs

 

In our example, we want the user to be able to change each parameter in the CDA.  They are going to be used by multiple components, including input text boxes and one or more CCC charting components, so these parameters live in the dashboard.  In the components tab, add a parameter for each and name them accordingly.

 

Figure 4: Parameters defined in the Components tab.

 

In order to allow the user to see and change the values of these parameters, some kind of input control must be chosen.  For the example, I'm using simple Text Input Component (text box) inputs, but other input controls like dropdown lists, date ranges and buttons are available as well. Each input should be mapped to a parameter via the properties of the input (see Figure 5), and these will come from the pool of dashboard parameters.  In fact, if you start typing the Parameter name, the CDE will pull up a context menu of prompts drawn from the list of dashboard parameters that have been created so far.

 

Figure 5: Mapping parameters to Text Input Components.

 

 

Adding Charting Components

 

This is where it gets really interesting.  We're going to work with a bar chart again since we're making histograms.  In order for the histogram to change in response to user input, we have to do two things: we need tell the chart component to react to changes in a dashboard parameter and we need to tell the chart component how to update from its datasource.  That's done in the Listeners and Parameters dialogs respectively of the charting component configuration shown in Figure 6.

Figure 6: The configuration section for a Community Bar Chart component.

 

To have the chart react to parameter changes, click on the Listeners dialog.  A list of available dashboard parameters will be presented with checkboxes.  Simply check the parameters of interest.  If a parameter is not checked as a listener, then the component will not update when that parameter changes.

 

What does it mean for a chart to react to a parameter change?  It means that it refreshes its data from its datasource, and so it needs to know how to map the dashboard parameters to the cda parameters in its datasource.  This is done by clicking on the Parameters dialog, and shown in Figure 7.

Figure 7: The Parameters dialog box of a Community Chart Component.

 

When updating in reaction to a parameter change, the chart component will use the values of the parameters on the right for the CDA datasource arguments on the left.  Note: if a parameter is not listed here, then CDA will use the default value from figure 3.  Also, it is not necessary to use a dashboard parameter here.  A constant value could be used instead if, for instance, the chart has some default value that is not shared by other chart components.

 

Tying it all Together

 

So when we look at the final product, we have the following result shown in figure 8.  Back to the Central Limit Theorem, the width of the resulting normal distribution of means is inversely related to the square root of the number of entries in each group, so it is useful to be able to change the histogram definition to track changes in the other input parameters.  Note that the user can also see the underlying exponential distribution by simply setting the group size to 1.  Lots of interactive investigations are possible with this, the whole power of PDI is behind it!

 

In closing, a note about the parameter mappings we encountered above.  It should be noted that CTools is a collection of distinct technologies: CDA (Community Data Access) which gets data from backend sources like a database or a PDI and returns it in a usable format, CCC (Community Charting Components) which displays graphical information, and the CDF (Community Dashboard Framework) which provides the backbone that knits components together.  The parameter mappings fundamentally exist at the boundaries of these technologies and reflect the immense flexibility of CTools.  The ability to define mappings and defaults at every level makes a rich variety of possible dashboard configurations. For example, to enforce a common parameter value across all dashboard components, simply specify it's default in CDA and leave it out of the component mappings.  Or to change the user experience of the current example and have the chart update in response to a button change rather than react to each parameter change individually, simply add a button and make the histogram listen only to the button.

 

UPDATED 12/11/2017: The layout of the attached example site (Central Limit Theorem.zip) was updated using some best practices that I learned about at the CT1000: CTools Fundamentals online course last week.

Histograms are a great way to probe density functions.  Visually they look like ordinary bar charts, but the bars (or bins in histogram-speak) are always quantitative and ordered rather than qualitative.  A histogram is defined by n mutually exclusive bins covering a continuous region of possible measure values from low to high.  An instance of a histogram is specified by a value associated to each bin, plus a value for underflows and another value for overflows.  The value in each bin represents the number of times a measure value fell in that bin, the underflow represents the number of times a measure value fell below the low edge of the first bin, and the overflow represents the number of times a measure value fell above the high edge of the last bin.  Observations might be weighted, in which case the values represent the sum of the weights.

 

In my last blog post, I demonstrated using a User Defined Java Class to measure the time between different workflow steps when a workflow is ill defined or irregular.  In this blog post, we'll explore creating a User Defined Java Class in a transformation t_histogram_example.ktr to create and fill histograms defined in an Info Step. Then we'll see how these can be used to drive a CCC Bar Chart in a dashboard created with the Community Dashboard Extension (CDE).  In the current example, we'll use a data grid to bring in hard-coded histogram definitions, but these could come in from a configuration database as well.

 

 

Figure 1: In this example, the info step is a Data Grid providing hard-coded histogram definitions, each containing an Integer ID, a name, a low specification, a high specification, a number of bins, and an optional field name containing a fill weight for the histogram (default is 1.0).   

 

The measure values that we will be looking at are random numbers generated according to several simple distributions which can be generated with inverse methods: uniform, Gaussian (or Normal), exponential, and logistic.  (See http://luc.devroye.org/handbooksimulation1.pdf for more information on how these were generated.)  The input parameters of the transformation define the ranges of these random numbers: a low and high for the uniform range, a mean and standard deviation for each Gaussian, and a scale factor for the exponential distribution.  These measures come in as data rows.

 

During initialization of the User Defined Java Class, we get the row set containing histogram definitions using the method findInfoRowStep().  Next, we loop over the info step rows and create the histograms.  The histograms are implemented in a Java static nested class, and used with the following methods.

  • fill(x): Increments the value of the bin corresponding to x by 1.0
  • fill(x, w): Increments the value of the bin corresponding to x by the number w.
  • getBins(): returns all of the bin values in an array.

The name field is used to identify a measure in the data rows, which is used to fill the corresponding histogram.  If a weight field is specified, then the fill(x) method is used.  Otherwise, the fill(x, w) method is used with the value w from the weight field.

 

The User Defined Java Class accumulates data until there are no more data rows detected.  At this point it dumps all histograms to the output.  Each row contains the histogram ID, the measure, the bin number, and the bin value.   In the example, we split the output rows by histogram ID and send to different dummy steps that can be referenced later.

 

Figure 2: Output is split by histogram ID.

 

The output of the first Gaussian using the step preview is shown in Figure 3.

 

Figure 3: The output of a histogram containing a standard normal distribution with 11 bins from -5.5 to +5.5.

 

The following assumes that a Pentaho Business Analytics server is installed and running with the Community Dashboard Editor enabled.  This example was done on version 7.1, and instructions can be found here: 7.1 - Pentaho Documentation .


To create a visual using the CDE, log into the Pentaho User Console (PUC) and create a new folder.  In this example, the folder is called "/home/Histograms".  Into this folder, upload the transformation t_histogram_example.ktr, and create a new CDE dashboard.  In the data sources panel, create a new "kettleTransFromFile" data source from the Kettle Queries tab.  This type of datasource will silently create a Community Data Access (CDA) query which will be the conduit between the PDI transformation and the CDE dashboard.  To configure the datasource, specify

  • A unique name for the datasource
  • The PDI transformation file having the histograms that was uploaded through the PUC
  • The PSI step that contains the histogram of interest, in this case out: 2.
  • Output Columns should be set up to the bin number and the bin value.  The CCC Bar Chart is expecting two input columns: the first should be the X-axis categories of the bar chart, and the second should be the bin values.  The structure of the output can be seen from Figure 3, and counting from 0, these are the 3rd and 6th columns.

The rest of the options should be OK from defaults.

 

Figure 4: Options needed for the kettleTransFromFile to set up a CCC Bar Chart.

 

In the layout panel, create a row and a column and give the column a name.  Change layout parameters as needed to create a reasonably big sized chart.  Finally, in the components panel, choose a CCC Bar Chart component and connect it to the layout (using the name given to the column as the "htmlObject") and the datasource using the datasource name.  Save the dashboard, and then preview it.  It configured correctly, you should see a chart like shown in Figure 5.

Figure 5: The output of the histogram from step out: 2, which has a Standard Normal distribution.

 

In conclusion, we've demonstrated generating random data and binning it in histograms using a User Defined Java Class in PDI.  We also showed how to display these histograms in a CCC Bar Chart in a CDE dashboard.  The techniques demonstrated here can be extended to get histogram definitions from a configuration database managed by a web application, or even to output and clear histograms periodically to provide real-time displays.

 

 

I recently wrote about everything you needed to know about Pentaho 8. And now is available! Go get your Enterprise Edition or trial version from the usual places

 

For CE, you can find it on the new community home!

 


Enjoy!

 


-pedro