.NET for Apache Spark – MQTT Streaming

.NET for Apache Spark 0.4.0 was released recently. Therefore, it is now time to test, if it can be used for MQTT Streaming as well.

If you followed my series about a real-time data processing pipeline, you probably remember that I have used Apache Bahir to retrieve streaming data from Apache ActiveMQ via the MQTT protocol. The data itself was generated by IanniX and forwarded to ActiveMQ utilizing my osc2activemq docker image.

Preparing .NET for Apache Spark for MQTT Streaming

For the most parts, you can follow this quick intro tutorial, which walks you through the process of installing .NET for Apache Streaming, on either Windows, or Linux.
There is one important caveat though:

Apache Bahir, which I use for MQTT Streaming into Spark, is currently available in version 2.3.3 and only works with Apache Spark of the same version.

Download Apache Spark 2.3.3
Download Apache Spark 2.3.3

Keeping that in mind, you can either download Apache Spark 2.3.3 directly from the download page, or just use my Linux docker image tagged with 0.4.0-spark-2.3.3-linux.

Building the C# application with .NET Core

Having prepared your test environment, you are now ready to create a new dotnet core project using the following command in your cmd window or shell:

dotnet new console -o ActiveMqTest

Once it created the new folder with the related project files, change into it and add the Microsoft.Spark package by entering the following.

dotnet add package Microsoft.Spark --version 0.4.0

Next, open the Program.cs file and replace the code with the one listed below.

using Microsoft.Spark.Sql;

namespace ActiveMqTest
{
    class Program
    {
        static void Main(string[] args)
        {
            // Create a Spark session
            var spark = SparkSession
                .Builder()
                .AppName("ActiveMqTest")
                .GetOrCreate();

            // Connect via MQTT
            var mqttDf = spark.ReadStream()
                .Format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
                .Option("brokerUrl", "tcp://192.168.10.159:1883")
                .Option("clientId", "ActiveMqTest")
                .Option("topic", "OscStream")
                .Load();

            mqttDf.PrintSchema();

            // Write any received data to the console
            var query = mqttDf.WriteStream()
                .Format("console")
                .Start();

            // Run until terminated
            query.AwaitTermination();
        }
    }
}

Save the file and build the project.

dotnet build

Submitting to Spark

After the application successfully built, you can submit it to spark using the following command:

spark-submit --packages org.apache.bahir:spark-sql-streaming-mqtt_2.11:2.3.3 --class org.apache.spark.deploy.dotnet.DotnetRunner --master local bin\Debug\netcoreapp2.2\microsoft-spark-2.3.x-0.4.0.jar dotnet bin\Debug\netcoreapp2.2\ActiveMqTest.dll

The –packages parameter is needed to automatically download the required Apache Bahir MQTT Streaming provider library. Once downloaded, it is also automatically made available to our submitted job.

Here is an excerpt of the console output on my Windows VM after submitting.

Console output after submitting to Spark
Console output after submitting to Spark

Getting a valid output for PrintSchema(), tells me that I could successfully connect to ActiveMQ via MQTT.
The streaming query progress provides more details about the related batch of the stream.

So now it is time to start up IanniX and generate some data in real-time. Let’s watch the console output.

.NET for Apache Spark receiving MQTT Streaming data
The job is receiving data

Viola, the data arrives. Great news!

Outlook

Just writing streaming data to the console is a good way of verifying that I can read from a stream. But of course I want to do more than just that.

Therefore, stay tuned for one of my next posts, as I plan to find out, how .NET for Apache Spark can be used to

  • transform streaming data
  • write the transformed data into a database

So have a great time and hope to see you again, soon.

1 Comment

  1. […] .NET for Apache Spark – MQTT Streaming […]

Comments are closed.

Scroll to top