Ken Wood

Bi-Directional Communication With MQTT and PDI

Blog Post created by Ken Wood on Dec 5, 2017

"IoT Hello World"




It was brought to my attention at PCM2017 in November, that I still owe the third blog on using the MQTT plugin for Pentaho Data Integration (PDI). So, thanks for reminding me. For those that haven’t been keeping track, there were two other blogs written earlier this year introducing the MQTT plugin at Pentaho + MQTT = IoT and applying some security techniques to MQTT in Securing IoT Data with Pentaho’s MQTT Plugin. So, to close out this 3 blog series, let’s kick it up a bit. I would like to describe a small project I’ve been working on that uses PDI and MQTT in a bi-directional communication configuration. This means using both the MQTT Publisher and MQTT Subscriber steps within the same transformation. I have tweeted snippets of this project throughout the year as I developed it, and now I’ll explain how it works. Refer to the diagram in Diagram-1 which shows the full architecture of this project, which I call “IoT Hello World”. I use this project for demonstrations, education and as a way to generate streaming IoT data for participants in the #hitachicode1 hack-a-thon events.



Diagram-1: The overall architecture of project "Pentaho - IoT Hello World"


Project Overview


A quick description of what this project does before I explain how it works. This is a 6 degree of freedom (DoF) robotic arm that performs several robotic routines - Displaying "Hello" & "World", then picking up and placing a car on the track (refer to the video above). While looping through these routines, components of the robotic arm, the servo motors, will begin to heat up. Temperature sensors have been attached to all 6 servo motors so that the temperature of the servo motors can be sensed and reported. The data stream is published in comma separated variables (csv) format, that is generated from this data includes,


  • Robotic Arm Controller (RAC) serial number (read from a configuration file)
  • A RAC system timestamp
  • The "Shoulder" servo motor temperature
  • The "Twist" servo motor temperature
  • The "Wrist" servo motor temperature
  • The "Elbow" servo motor temperature
  • The "Base" servo motor temperature
  • The "Claw" servo motor temperature
  • An MD5 hash of first 8 fields of this message
  • The MQTT Topic that this message is published under


An example MQTT message looks like this and is published under the topic "Pen/Robot/Temp",


SN84112-2,2017-07-26 13:57:28,26.0,25.0,25.0,27.0,24.0,24.0,6c18c1515dea2c5ddfcc6c69a18cbedf,Pen/Robot/Temp


There is a another message that is published by the RAC during startup. When the RAC is booted and the "Start" button is pushed, a registration message is published. This message "registers" the RAC with the Corporate Server Application which is stored in the device registration table. The message consists of the following fields in csv format,


  • RAC serial number (read from a configuration file)
  • A RAC description (read from a configuration file)
  • A RAC system timestamp
  • IP address of the RAC
  • The node name of the RAC
  • The PDI software version
  • Latitude
  • Longitude


An example of the device registration MQTT message looks like this and is published under the topic "Pen/Robot/DeviceRegister",


SN84112-2,SN84112-2 Robot Arm Hello World 6 DOF 6 Servos,2017-07-26 13:33:50.138,,ironman,,30.6291065,-100.995013667


Again, refer to Diagram-1. The four main sections of this project, with the fourth component being optional, of this project are,


  1. The “Corporate Server Application” subscribes and publishes to several message queues that,
    1. Subscribes to the "Device Registration" message queue to receive a one time registration information message when the RAC comes online
    2. Subscribes to the "Device Data Stream" message queue for the operational information from the RAC
    3. Publishes the response to the "Corporate Response" messages queue about corporate operating temperature specifications
    4. Publishes a message stream to the "Mobile Stream" message queue for a mobile application to monitor the robot arm remotely
  2. The Robot Controller Application and Sensor Monitoring which registers the RAC with the Corporate Server, collects sensor data and other controller information, and manages all the control buttons, LED indicators and the robotic arm itself.
  3. The MQTT Broker, which is a free MQTT Broker at (but any MQTT Broker can be used) for hosting the four message queues used in this system. The four message queues used are defined as,
    1. Device Data Stream – used for the device data stream
    2. Device Registration – used for the device registration message
    3. Corp Response – used for the corporate server app to send messages back to the robot arm controller.
    4. Mobile Stream – a dedicated message stream for the mobile device app from the corporate server app
  4. A fourth component is a mobile app for remote monitoring. This is not required and the system will run with or without the mobile device present.


As mentioned earlier, what’s happening with this system is while the robot arm is running its routines, the servo motors will begin to heat up. The sensors attached to each servo motor are read continuously and a status LED associated with the temperature readings provide a local visual status. This is called the “Local Vendor Temperature Status” and it covers a temperature range that corresponds to Green (OK), Yellow (WARNING) and Red (OVERHEAT) LEDs. The other set of status LEDs are called the “Corporate Temperature Specifications”. The Corporate Server Application is subscribing to the RAC’s data stream in real-time and responding based on a different definition of Green (OK), Yellow (WARNING) and Red (OVERHEAT) conditions. Basically, the corporate temperature specifications are lower than the vendor temperature specifications.


I called this the “fishing boat situation”. If you've ever ask a fishing boat captain if the boat will go faster, they will respond with “...yes, the vessel is designed to go faster, but by running the engines at half speed, the engines will last twice as long”. This is the same situation here. The robot arm vendor will say that the robot can run at a higher temperature, but the owner of the robot arm (Corporation ABC) wants to operate the robot arm at a lower temperature, or in this case, indicate that the robot arm is operating at a higher temperature than they want it to. This, of course, is just a simulated scenario in order to tell the story of what’s going on.



The Robot Arm Controller


The robot, robotic arm controller (RAC) and monitoring application is a collection of orchestrated PDI transformations that execute through a job that all run on a Raspberry Pi. These PDI components do the following,


  1. Registers the robot with the current GPS location using a GPS module installed on the RPi, various system information, the device’s serial number and device description. This registration information is formatted and published to the “Device Registration” message queue.
  2. The Data Collection component monitors the 6 RobotArmUI.pngtemperature sensors installed on the RAC's servo motors and are connected the RPi. This component publishes the formatted message to the “Device Data Stream” message queue.
  3. Subscribes to the “Corp Response” message queue to receive the current corporate operating temperature specifications.
  4. PDI also orchestrates a suite of python programs that are used to do the physical IO associated with the RAC like,
    1. Turn LEDs indicators on and off
    2. Collect temperature readings from the temperature sensors
    3. Interface with the control buttons
    4. Collect the latitude and longitude data from the GPS module



All the physical general purpose IO (GPIO) connections for LEDs, sensors, GPS coordinates, and robotic arm manipulation and control are performed through various python programs executed from PDI. An example python program looks like this for reading the 6 temperature sensors and outputting a list of temperatures. I picked this code sample to show you because it reads all the sensor in parallel versus sequentially. Each sensor takes several seconds to read and the time adds up if read in sequence.



The Corporate Server


The Corporate Server plays the role of maintaining the company’s operational specification on preferred temperature levels versus the vendor's operating specification. Let’s take a closer look at the transformation running the Corporate Server. In the transformation shown below, see Transformation-1, we can see that this transformation uses both the MQTT Subscriber and MQTT Publisher step simultaneously within the same transformation.


From left to right, the transformation subscribes to the topic “Pen/Robot/Temp” at the MQTT Broker. We then copies the message stream into four threads.


  1. This thread goes to another transformation that builds another message and publishes it to the "Mobile Device" message queue.
  2. This thread splits the message into fields and updates a table in the database (the commit for this database is set to 1 so every record is immediately committed).
  3. This thread also gets a copy of the message and splits out the temperature values and does some calculations on the temperatures from the RAC's temperature sensors. This value is then used in the Switch/Case step to compare it to a table of corporate define temperature values. Base on the result, a MQTT message is published to the “Corp Response” queue on the MQTT Broker with a Green (OK), Yellow (WARNING) or Red (OVERHEAT) status. This status is then used by the RAC to light the corporate LED indicators. This is what I’m referring to as bi-directional communication or full-duplex communication using MQTT. The transformation is subscribing to a topic and receiving a message, then within the same transformation publishing a message.
  4. This thread is just dumping the raw messages straight into a log file locally for prosperity purposes.



Transformation-1: The main transformation for the Corporate Server Application.


There is a second (floating) transformation within this main transformation for catching the Device Registration information when the RAC is started. This should work when there are multiple controllers using this system. In fact, the mobile device stream is a join of the Device Registration table (to get the GPS latitude and longitude, the robot arm controller description information, and the information from the table storing the temperature readings already stored in the database.


That's it! The entire process on the RAC is set up when the Raspberry Pi boots up. The push button interface kicks of scripts that launches kitchen to start the RAC application. The Corporate Server Application is started with a script to start pan.


RACandPhone.pngThis is a great project that has great IoT demonstration value, plus it was just a blast to do. It is also portable so I can travel with it as necessary. I have started on a second version of this robot concept with many more sensors, more complicated routines and other updated components. I looking forward to showing that project some day soon.


Let me know if you have any questions or comments. I know this project is less industrial and more hobby-ish in nature, but it is a great tool for demonstrating complex concepts with Pentaho and generating real IoT data streams.