ActiveMQ, Spark & Bahir – Real-time data processing pipeline – Part 3

ActiveMQ, Spark & Bahir - Real-time data processing pipeline – Part 3

Having explained how to visually simulate sensor data and how to get it into ActiveMQ during the first two parts, it is now time to explore an initial setup that allows Apache Spark to read a data stream from ActiveMQ using Bahir.

Things to be aware of

Before we start, there are a couple of things you should be aware of, in case you want to follow along and try this out yourself.

Spark & Bahir version matching

Apache Bahir Spark Extensions 2.3.3

As stated above, I would like to subscribe to an ActiveMQ topic with Apache Bahir to transfer the associated data into Spark. At the time of writing, the Bahir Spark Extensions are available in version 2.3.3.

As the version number of the Bahir Spark Extensions needs to match the Spark version, I have created myself a new Docker image that is based on the existing Apache Zeppelin Docker image.

Here is the content of the Dockerfile that I’ve used

FROM apache/zeppelin:0.8.1

ENV SPARK_VERSION=2.3.3
ENV HADOOP_VERSION=2.7
ENV SPARK_INSTALL_ROOT=/spark
ENV SPARK_HOME=${SPARK_INSTALL_ROOT}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}

RUN mkdir "${SPARK_INSTALL_ROOT}" && \
    cd "${SPARK_INSTALL_ROOT}" && \
    wget --show-progress --progress=bar:force:noscroll https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \
    tar -xzf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \
    rm spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz

Spark JDBC sink ?

Spark does not provide a JDBC sink out of the box and looking at the reply to a related request, it does not seem to be very likely that it will be added in the near future.

No direct JDBC support in Spark?
No direct JDBC support in Spark?

The good news is, that Apache Bahir will come to rescue here as well, as there seems to be support for a JDBC sink available in the next release.

JDBC sink support should be available in the next version of Apache Bahir
JDBC sink support should be available in the next version of Apache Bahir

Spark foreach sink …

The JDBC sink obviously will be quite handy, but for now, until the next version of Bahir has been released, I will have to utilize the foreach sink mechanism that is provided by Apache Spark.

Unfortunately Spark 2.3.3 does not support using the foreach sink for Python.
But that’s not a huge problem, as this is a great opportunity to play around with Scala a bit, as well.

In Spark 2.3.3 the foreach sink is only available for Scala and Java
In Spark 2.3.3 the foreach sink is only available for Scala and Java

Zeppelin Interpreter

As a last step to get things ready, I need to add the Bahir dependency to the Zeppelin Spark interpreter.
This is done via the Dependencies section at the bottom of the interpreter settings page.

ActiveMQ, Spark & Bahir - Zeppelin Spark interpreter dependencies
Zeppelin Spark interpreter dependencies

In case you prefer to copy & paste:

org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.3 

If you want to know more about Zeppelin, please have a look at some of my related posts, or have a look at the Apache Zeppelin homepage.

Putting things together

ActiveMQ, Spark & Bahir

Checking whether I actually can get any data from ActiveMQ, is the first thing to do.

The Scala code below creates a new spark session and is using Bahir’s MQTTStreamSourceProvider to connect to ActiveMQ via port 1883.
After that, the dataframe schema should be displayed to get a better idea about the data structure that I am receiving.

In order to actually look at the data, I need to write it to a supported Spark output sink. The memory sink is great for debugging purposes, as long as you don’t exceed your systems memory limit, of course.

awaitTermination is used to ensure that the stream is processed continuously until it is stopped, or an error is thrown. To make this work in Zeppelin, it is important to specify a timeout value, so that the UI won’t be blocked forever.

%spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// create a named session
val spark = SparkSession
    .builder()
    .appName("ReadMQTTStream")
    .getOrCreate()

// read data from the OscStream topic
val mqttDf = spark.readStream   
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider") 
    .option("brokerUrl","tcp://192.168.10.159:1883") 
    .option("topic","OscStream") 
    .load()

mqttDf.printSchema // print the schema of the received data

val memQuery = mqttDf.writeStream 
    .queryName("memQuery")
    .format("memory")
    .start()

println("Starting...")
memQuery.awaitTermination(5)

Executing this, gives me the following result

ActiveMQ, Spark & Bahir - The schema output confirms the successful connection to ActiveMQ
The schema output confirms the successful connection to AtiveMQ

It is showing the correct schema, which tells me that it successfully connected to ActiveMQ.

Streaming Queries Monitoring & Management

Now let’s confirm that streaming is really up and running. The documentation explains all available management commands. I find the following most useful, to get a quick overview.

%spark

println("IsActive ..... => " + memQuery.isActive)
println("Status ....... => " + memQuery.status)
println("Last exception => " + memQuery.exception)
println("Last progress  => " + memQuery.lastProgress)
ActiveMQ, Spark & Bahir - An active streaming query
An active streaming query

Looks like this reveals some use information. For example:

  • The stream is active
  • It is not processing any data at the moment, since I haven’t started IanniX yet
  • No exceptions so far. Great!

Knowing how I can spin up streaming is great, but I also want to be able to stop and reset the query, so that I can reuse it as needed.

Here are the commands that allow me to do that:

%spark

// stop streaming to the memory sink
memQuery.stop()

// reset any terminated streaming queries
spark.streams.resetTerminated()

Querying the status of the stream now reflects that.
IsActive is false and the status message is set to Stopped.

ActiveMQ, Spark & Bahir - A stopped streaming query
A stopped streaming query

Roundup & Outlook

In this part I have shown how to set the foundation for getting data from ActiveMQ into Spark using Bahir.

In the next part I will continue using the Memory Sink to support the transformation of the received data into the destination format that will be written to the database.

That is it for now. Thanks for reading and hopefully you have found this useful.

2 Comments

  1. […] Part 3  - ActiveMQ, Spark & Bahir […]

  2. […] far, we know how to get our streaming data from ActiveMQ into Spark by using Bahir. On this basis, it is now time to implement the data transformation required to get to the desired […]

Comments are closed.

Scroll to top